package com.gentics.mesh.search.verticle.bulk;

import com.gentics.mesh.core.data.search.request.BulkRequest;
import com.gentics.mesh.core.data.search.request.Bulkable;
import com.gentics.mesh.core.data.search.request.SearchRequest;
import com.gentics.mesh.search.verticle.eventhandler.Util;
import io.reactivex.FlowableOperator;
import io.reactivex.internal.util.BackpressureHelper;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:com/gentics/mesh/search/verticle/bulk/BulkOperator.class */
public class BulkOperator implements FlowableOperator<SearchRequest, SearchRequest> {
    private static final Logger log = LoggerFactory.getLogger(BulkOperator.class);
    private final Vertx vertx;
    private final long bulkTime;
    private final int requestLimit;
    private final long lengthLimit;
    private ActualBulkOperator<SearchRequest> operator;
    final AtomicBoolean flushActive = new AtomicBoolean(true);

    /* loaded from: input_file:com/gentics/mesh/search/verticle/bulk/BulkOperator$ActualBulkOperator.class */
    interface ActualBulkOperator<T> extends Subscription, Subscriber<T> {
        void flush();

        boolean bulking();
    }

    public BulkOperator(Vertx vertx, Duration duration, int i, long j) {
        this.vertx = vertx;
        this.bulkTime = duration.toMillis();
        this.requestLimit = i;
        this.lengthLimit = j;
    }

    public Subscriber<? super SearchRequest> apply(final Subscriber<? super SearchRequest> subscriber) throws Exception {
        if (this.operator != null) {
            log.warn("More than one subscriber for the same operator detected. Flush will only work for the newest subscriber.");
        }
        this.operator = new ActualBulkOperator<SearchRequest>() { // from class: com.gentics.mesh.search.verticle.bulk.BulkOperator.1
            private Subscription subscription;
            private final BulkTimer timer;
            private final AtomicBoolean flushing = new AtomicBoolean(false);
            private boolean upstreamCompleted = false;
            private final AtomicLong requested = new AtomicLong(0);
            private final AtomicBoolean canceled = new AtomicBoolean(false);
            private final BulkQueue bulkableRequests = new BulkQueue();
            private final Queue<SearchRequest> nonBulkableRequests = new ConcurrentLinkedQueue();
            private final ReentrantLock lock = new ReentrantLock();

            {
                this.timer = new BulkTimer(BulkOperator.this.vertx, BulkOperator.this.bulkTime, () -> {
                    BulkOperator.log.trace("Flushing {} requests because time limit of {}ms has been reached.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), Long.valueOf(BulkOperator.this.bulkTime)});
                    flush();
                });
            }

            @Override // com.gentics.mesh.search.verticle.bulk.BulkOperator.ActualBulkOperator
            public void flush() {
                this.flushing.set(true);
                drain();
            }

            public void drain() {
                ReentrantLock reentrantLock = this.lock;
                Subscriber subscriber2 = subscriber;
                Util.skipIfMultipleThreads(reentrantLock, () -> {
                    if (!this.canceled.get() && this.requested.get() > 0 && !this.bulkableRequests.isEmpty() && BulkOperator.this.flushActive.get() && this.flushing.compareAndSet(true, false)) {
                        this.timer.stop();
                        BulkOperator.log.debug("Emitting bulk of size {} to subscriber", new Object[]{Integer.valueOf(this.bulkableRequests.size())});
                        do {
                            AtomicLong atomicLong = new AtomicLong(0L);
                            BulkRequest bulkRequest = new BulkRequest((List) IntStream.range(0, BulkOperator.this.requestLimit).mapToObj(i -> {
                                return this.bulkableRequests.poll();
                            }).takeWhile(optional -> {
                                return optional.filter(bulkable -> {
                                    return atomicLong.get() < BulkOperator.this.lengthLimit;
                                }).isPresent();
                            }).map((v0) -> {
                                return v0.get();
                            }).peek(bulkable -> {
                                atomicLong.addAndGet(bulkable.bulkLength());
                            }).collect(Collectors.toList()));
                            if (BulkOperator.log.isTraceEnabled()) {
                                BulkOperator.log.trace("Sending bulk to elasticsearch:\n{}", new Object[]{bulkRequest});
                            }
                            subscriber2.onNext(bulkRequest);
                            BackpressureHelper.produced(this.requested, 1L);
                        } while (!this.bulkableRequests.isEmpty());
                    }
                    if (!this.canceled.get() && this.requested.get() > 0 && !this.nonBulkableRequests.isEmpty()) {
                        SearchRequest remove = this.nonBulkableRequests.remove();
                        if (BulkOperator.log.isTraceEnabled()) {
                            BulkOperator.log.trace("Emitting remaining non bulkable request to subscriber: {}", new Object[]{remove});
                        }
                        subscriber2.onNext(remove);
                        BackpressureHelper.produced(this.requested, 1L);
                    }
                    if (this.upstreamCompleted && this.bulkableRequests.isEmpty() && this.nonBulkableRequests.isEmpty()) {
                        BulkOperator.log.trace("Sending onComplete event to subscriber");
                        subscriber2.onComplete();
                    }
                    request();
                });
            }

            private void request() {
                if (this.canceled.get() || this.requested.get() <= 0 || this.upstreamCompleted) {
                    return;
                }
                BulkOperator.log.trace("Requesting 1 item from upstream");
                this.subscription.request(1L);
            }

            @Override // com.gentics.mesh.search.verticle.bulk.BulkOperator.ActualBulkOperator
            public boolean bulking() {
                return this.timer.isRunning();
            }

            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                BulkOperator.log.trace("Calling onSubscribe of subscriber");
                subscriber.onSubscribe(this);
            }

            public void onNext(SearchRequest searchRequest) {
                BulkOperator.log.trace("Search request of class [{}] received from upstream.", new Object[]{searchRequest.getClass()});
                if (this.canceled.get()) {
                    return;
                }
                if (!(searchRequest instanceof Bulkable)) {
                    BulkOperator.log.trace("Flushing {} requests because non-bulkable request of class {{}} has been received.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), searchRequest.getClass()});
                    this.nonBulkableRequests.add(searchRequest);
                    flush();
                    return;
                }
                if (this.bulkableRequests.isEmpty()) {
                    this.timer.restart();
                }
                this.bulkableRequests.add((Bulkable) searchRequest);
                BulkOperator.log.trace("Added request of class [{}] to the current bulk with the size of now {}.", new Object[]{searchRequest.getClass(), Integer.valueOf(this.bulkableRequests.size())});
                if (this.bulkableRequests.size() < BulkOperator.this.requestLimit && this.bulkableRequests.getBulkLength() < BulkOperator.this.lengthLimit) {
                    request();
                    return;
                }
                if (BulkOperator.log.isTraceEnabled()) {
                    if (this.bulkableRequests.size() >= BulkOperator.this.requestLimit) {
                        BulkOperator.log.info("Flushing {} requests because request amount limit of {} has been reached.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), Integer.valueOf(BulkOperator.this.requestLimit)});
                    } else {
                        BulkOperator.log.info("Flushing {} requests with total size of {} because size limit of {} has been exceeded.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), Long.valueOf(this.bulkableRequests.getBulkLength()), Long.valueOf(BulkOperator.this.lengthLimit)});
                    }
                }
                flush();
            }

            public void onError(Throwable th) {
                BulkOperator.log.trace("Error event from upstream received: {}", th);
                if (this.canceled.get()) {
                    return;
                }
                this.canceled.set(true);
                subscriber.onError(th);
            }

            public void onComplete() {
                BulkOperator.log.trace("Completed event from upstream received");
                if (this.canceled.get()) {
                    return;
                }
                this.upstreamCompleted = true;
                flush();
            }

            public void request(long j) {
                BulkOperator.log.trace("Downstream requested {} items", new Object[]{Long.valueOf(j)});
                BackpressureHelper.add(this.requested, j);
                drain();
            }

            public void cancel() {
                BulkOperator.log.trace("Downstream canceled subscription");
                this.canceled.set(true);
                this.subscription.cancel();
            }
        };
        return this.operator;
    }

    public void flush() {
        log.info("Manually flushing bulked requests");
        if (this.operator != null) {
            this.operator.flush();
        }
    }

    public boolean bulking() {
        if (this.operator != null) {
            return this.operator.bulking();
        }
        return false;
    }
}
