package com.gentics.mesh.search.verticle;

import com.gentics.mesh.core.data.search.request.SearchRequest;
import com.gentics.mesh.core.rest.MeshEvent;
import com.gentics.mesh.core.rest.event.MeshEventModel;
import com.gentics.mesh.etc.config.MeshOptions;
import com.gentics.mesh.etc.config.search.ElasticSearchOptions;
import com.gentics.mesh.search.SearchProvider;
import com.gentics.mesh.search.impl.ElasticsearchResponseErrorStreamable;
import com.gentics.mesh.search.verticle.bulk.BulkOperator;
import com.gentics.mesh.search.verticle.eventhandler.MainEventHandler;
import com.gentics.mesh.search.verticle.eventhandler.RxUtil;
import com.gentics.mesh.search.verticle.eventhandler.SyncEventHandler;
import com.gentics.mesh.search.verticle.eventhandler.Util;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.functions.Function;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.BehaviorSubject;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.inject.Inject;

/* loaded from: input_file:com/gentics/mesh/search/verticle/ElasticsearchProcessVerticle.class */
public class ElasticsearchProcessVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchProcessVerticle.class);
    private final MainEventHandler mainEventhandler;
    private final SearchProvider searchProvider;
    private final IdleChecker idleChecker;
    private final SyncEventHandler syncEventHandler;
    private final ElasticSearchOptions options;
    private List<MessageConsumer<JsonObject>> vertxHandlers;
    private FlowableProcessor<MessageEvent> requests = PublishProcessor.create();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final BehaviorSubject<Boolean> elasticsearchAvailable = BehaviorSubject.createDefault(true);
    private final AtomicBoolean waitForSync = new AtomicBoolean(false);

    @Inject
    public ElasticsearchProcessVerticle(MainEventHandler mainEventHandler, SearchProvider searchProvider, IdleChecker idleChecker, SyncEventHandler syncEventHandler, MeshOptions meshOptions) {
        this.mainEventhandler = mainEventHandler;
        this.searchProvider = searchProvider;
        this.idleChecker = idleChecker;
        this.syncEventHandler = syncEventHandler;
        this.options = meshOptions.getSearchOptions();
    }

    public void start() {
        log.trace("Initializing Elasticsearch process verticle");
        assemble();
        this.idleChecker.idling().subscribe(obj -> {
            log.trace("All requests completed. Sending idle event");
            this.vertx.eventBus().publish(MeshEvent.SEARCH_IDLE.address, (Object) null);
        });
        this.vertxHandlers = (List) this.mainEventhandler.handledEvents().stream().map(meshEvent -> {
            return this.vertx.eventBus().localConsumer(meshEvent.address, message -> {
                if (this.stopped.get() || isDroppedEvent(message)) {
                    return;
                }
                this.idleChecker.incrementAndGetTransformations();
                this.elasticsearchAvailable.filter(bool -> {
                    return bool.booleanValue();
                }).firstOrError().subscribe(bool2 -> {
                    this.waitForSync.set(false);
                    log.trace(String.format("Received event message on address {%s}:\n%s", message.address(), message.body()));
                    this.requests.onNext(new MessageEvent(meshEvent, MeshEventModel.fromMessage(message)));
                });
            });
        }).map(MessageConsumer::new).collect(Collectors.toList());
        List<MessageConsumer<JsonObject>> list = this.vertxHandlers;
        MeshEvent meshEvent2 = MeshEvent.IS_SEARCH_IDLE;
        IdleChecker idleChecker = this.idleChecker;
        idleChecker.getClass();
        list.add(replyingEventHandler(meshEvent2, Single.fromCallable(idleChecker::isIdle)));
        this.vertxHandlers.add(replyingEventHandler(MeshEvent.SEARCH_REFRESH_REQUEST, refresh().andThen(Single.just(true))));
        log.trace("Done Initializing Elasticsearch process verticle");
    }

    public MessageConsumer<JsonObject> replyingEventHandler(MeshEvent meshEvent, Single<?> single) {
        return new Vertx(this.vertx).eventBus().localConsumer(meshEvent.address, message -> {
            single.subscribe(obj -> {
                message.reply(obj);
            });
        });
    }

    private boolean isDroppedEvent(Message<JsonObject> message) {
        return this.waitForSync.get() && !message.address().equals(MeshEvent.INDEX_SYNC_REQUEST.address);
    }

    public void stop(Future<Void> future) {
        log.trace("Stopping Elasticsearch process verticle");
        this.stopped.set(true);
        Observable.fromIterable(this.vertxHandlers).flatMapCompletable((v0) -> {
            return v0.rxUnregister();
        }).andThen(flush()).subscribe(() -> {
            this.requests.onComplete();
            this.idleChecker.close();
            log.trace("Done stopping Elasticsearch process verticle");
            future.complete();
        });
    }

    public Completable flush() {
        return Completable.fromRunnable(() -> {
            this.vertx.eventBus().publish(MeshEvent.SEARCH_FLUSH_REQUEST.address, (Object) null);
        });
    }

    public Completable refresh() {
        return this.searchProvider.refreshIndex(new String[0]).doOnSubscribe(disposable -> {
            log.trace("Refreshing all Elasticsearch indices...");
        }).doOnComplete(() -> {
            log.trace("Refresh complete.");
        });
    }

    private void assemble() {
        this.requests.compose(this::bufferEvents).concatMap(this::generateRequests, 1).lift(new BulkOperator(this.vertx, Duration.ofMillis(this.options.getBulkDebounceTime()), this.options.getBulkLimit(), this.options.getBulkLengthLimit())).concatMap(searchRequest -> {
            return sendRequest(searchRequest).onErrorResumeNext(Flowable.empty());
        }, 1).doOnError(th -> {
            log.info("Error at end of ES process chain", th);
        }).retry().subscribe();
    }

    private <T> Flowable<T> bufferEvents(Flowable<T> flowable) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return flowable.doOnNext(obj -> {
            atomicInteger.incrementAndGet();
        }).onBackpressureBuffer(this.options.getEventBufferSize(), () -> {
            log.info("Event buffer size of {} was reached. Dropping all pending events and scheduling index sync.", new Object[]{Integer.valueOf(this.options.getEventBufferSize())});
            atomicInteger.set(0);
            this.idleChecker.resetTransformations();
            startSync();
        }).retry(th -> {
            return th instanceof MissingBackpressureException;
        }).doOnNext(obj2 -> {
            atomicInteger.decrementAndGet();
        });
    }

    private void startSync() {
        this.waitForSync.set(true);
        this.elasticsearchAvailable.onNext(false);
        Observable.interval(this.options.getRetryInterval(), TimeUnit.MILLISECONDS).flatMapSingle(l -> {
            return this.searchProvider.isAvailable();
        }).filter(bool -> {
            return bool.booleanValue();
        }).firstOrError().subscribe(bool2 -> {
            log.info("Elasticsearch is available again. Starting sync.");
            this.elasticsearchAvailable.onNext(bool2);
        });
        this.vertx.eventBus().publish(MeshEvent.INDEX_SYNC_REQUEST.address, (Object) null);
    }

    private Flowable<SearchRequest> sendRequest(SearchRequest searchRequest) {
        return this.stopped.get() ? Flowable.empty() : searchRequest.execute(this.searchProvider).doOnSubscribe(disposable -> {
            log.trace("Sending request to Elasticsearch: {}", new Object[]{searchRequest});
        }).doOnComplete(() -> {
            log.trace("Request completed: {}", new Object[]{searchRequest});
        }).doOnError(th -> {
            Util.logElasticSearchError(th, () -> {
                log.error("Error for request: {}", new Object[]{searchRequest});
                log.error("Error after sending request to Elasticsearch", th);
            });
        }).andThen(Flowable.just(searchRequest)).onErrorResumeNext(this::syncIndices).onErrorResumeNext(ignoreElasticsearchErrors(searchRequest)).retryWhen(RxUtil.retryWithDelay(Duration.ofMillis(this.options.getRetryInterval()), this.options.getRetryLimit())).doFinally(() -> {
            log.trace("Request-{}", new Object[]{searchRequest});
            this.idleChecker.addAndGetRequests(-searchRequest.requestCount());
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Flowable<SearchRequest> syncIndices(Throwable th) {
        return ((th instanceof ElasticsearchResponseErrorStreamable) && ((ElasticsearchResponseErrorStreamable) th).stream().anyMatch(elasticsearchResponseError -> {
            return "index_not_found_exception".equals(elasticsearchResponseError.getType());
        }) && !this.stopped.get()) ? this.syncEventHandler.generateSyncRequests().doOnNext(searchRequest -> {
            log.trace("SyncRequest+{}", new Object[]{searchRequest});
            this.idleChecker.addAndGetRequests(searchRequest.requestCount());
        }).doOnSubscribe(subscription -> {
            log.trace("Index not found. Resyncing.");
        }).concatMap(this::sendRequest, 1) : Flowable.error(th);
    }

    private Function<Throwable, Flowable<SearchRequest>> ignoreElasticsearchErrors(SearchRequest searchRequest) {
        return th -> {
            if (!(th instanceof ElasticsearchResponseErrorStreamable)) {
                return Flowable.error(th);
            }
            log.error("Not retrying because it is an error inside elasticsearch.");
            return Flowable.just(searchRequest);
        };
    }

    private Flowable<? extends SearchRequest> generateRequests(MessageEvent messageEvent) {
        if (this.stopped.get()) {
            return Flowable.empty();
        }
        try {
            Flowable doOnComplete = this.mainEventhandler.handle(messageEvent).doOnNext(searchRequest -> {
                if (log.isTraceEnabled()) {
                    log.trace("Request+{}", new Object[]{searchRequest});
                }
                this.idleChecker.addAndGetRequests(searchRequest.requestCount());
            }).retryWhen(RxUtil.retryWithDelay(Duration.ofMillis(this.options.getRetryInterval()), this.options.getRetryLimit())).doOnComplete(() -> {
                log.trace("Done transforming event {}. Transformations pending: {}", new Object[]{messageEvent.event, Integer.valueOf(this.idleChecker.getTransformations())});
            });
            IdleChecker idleChecker = this.idleChecker;
            idleChecker.getClass();
            return doOnComplete.doOnTerminate(idleChecker::decrementAndGetTransformations);
        } catch (Exception e) {
            e.printStackTrace();
            return Flowable.empty();
        }
    }

    public IdleChecker getIdleChecker() {
        return this.idleChecker;
    }
}
