package com.gentics.mesh.search.verticle.bulk;

import com.gentics.mesh.core.data.search.request.BulkRequest;
import com.gentics.mesh.core.data.search.request.Bulkable;
import com.gentics.mesh.core.data.search.request.SearchRequest;
import io.reactivex.Flowable;
import io.reactivex.subscribers.TestSubscriber;
import io.vertx.core.Vertx;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:com/gentics/mesh/search/verticle/bulk/BulkOperatorTest.class */
public class BulkOperatorTest {
    private SearchRequest nonBulkable;
    private Bulkable bulkable;
    private BulkOperator bulkOperator;
    private static final int bulkTime = 50;

    @Before
    public void setUp() throws Exception {
        this.nonBulkable = (SearchRequest) Mockito.mock(SearchRequest.class);
        this.bulkable = (Bulkable) Mockito.mock(Bulkable.class);
        Mockito.when(Long.valueOf(this.bulkable.bulkLength())).thenReturn(1000L);
        this.bulkOperator = new BulkOperator(Vertx.vertx(), Duration.ofMillis(50L), 100, 100000000L);
    }

    private Flowable<SearchRequest> createAlternatingRequests(int... iArr) {
        return Flowable.range(0, iArr.length).concatMap(num -> {
            return num.intValue() % 2 == 0 ? Flowable.just(this.nonBulkable).repeat(iArr[num.intValue()]) : Flowable.just(this.bulkable).repeat(iArr[num.intValue()]);
        });
    }

    private Flowable<SearchRequest> createNotCompletedAlternatingRequests(int... iArr) {
        return Flowable.merge(Flowable.never(), createAlternatingRequests(iArr));
    }

    @Test
    public void testBulking() throws InterruptedException {
        createAlternatingRequests(2, 3).lift(this.bulkOperator).count().test().await().assertValue(3L);
        createAlternatingRequests(1, 3, 1).lift(this.bulkOperator).count().test().await().assertValue(3L);
        createAlternatingRequests(1, 3, 1, 2).lift(this.bulkOperator).count().test().await().assertValue(4L);
    }

    @Test
    public void testTimeBasedFlushing() throws InterruptedException {
        TestSubscriber test = createNotCompletedAlternatingRequests(1, 3).lift(this.bulkOperator).test();
        test.assertValueCount(1);
        test.await(100L, TimeUnit.MILLISECONDS);
        test.assertValueCount(2);
    }

    @Test
    public void testManualFlushing() {
        TestSubscriber test = createNotCompletedAlternatingRequests(1, 3).lift(this.bulkOperator).test();
        test.assertValueCount(1);
        test.assertValueAt(0, this::isNonBulkRequest);
        this.bulkOperator.flush();
        test.assertValueCount(2);
        test.assertValueAt(1, this::isBulkRequest);
    }

    @Test
    public void testBackpressure() {
        Flowable<SearchRequest> createAlternatingRequests = createAlternatingRequests(2, 3, 1, 5, 1);
        AtomicLong atomicLong = new AtomicLong();
        TestSubscriber test = createAlternatingRequests.doOnNext(searchRequest -> {
            atomicLong.incrementAndGet();
        }).lift(this.bulkOperator).test(0L);
        test.request(1L);
        test.assertValueCount(1);
        Assertions.assertThat(atomicLong.get()).isEqualTo(1L);
        test.request(1L);
        test.assertValueCount(2);
        Assertions.assertThat(atomicLong.get()).isEqualTo(2L);
        test.request(1L);
        test.assertValueCount(3);
        Assertions.assertThat(atomicLong.get()).isEqualTo(6L);
        test.request(1L);
        test.assertValueCount(4);
        Assertions.assertThat(atomicLong.get()).isEqualTo(6L);
        test.request(1L);
        test.assertValueCount(5);
        Assertions.assertThat(atomicLong.get()).isEqualTo(12L);
        test.request(1L);
        test.assertValueCount(6);
        Assertions.assertThat(atomicLong.get()).isEqualTo(12L);
        test.assertComplete();
    }

    @Test
    public void testSizeLimit() {
        createAlternatingRequests(1, 500).lift(new BulkOperator(Vertx.vertx(), Duration.ofMinutes(1L), 100, 100000000L)).test().assertValueCount(6).assertComplete();
    }

    @Test
    public void testLengthLimit() throws InterruptedException {
        TestSubscriber assertNotComplete = createNotCompletedAlternatingRequests(1, 12).lift(new BulkOperator(Vertx.vertx(), Duration.ofMillis(50L), 100, 5000L)).test().assertValueCount(3).assertNotComplete();
        assertNotComplete.await(100L, TimeUnit.MILLISECONDS);
        assertNotComplete.assertValueCount(4).assertNotComplete();
    }

    @Test
    public void testBulkOverflowSizeLimit() throws InterruptedException {
        BulkOperator bulkOperator = new BulkOperator(Vertx.vertx(), Duration.ofMinutes(1L), 100, 100000000L);
        bulkOperator.flushActive.set(false);
        TestSubscriber test = createNotCompletedAlternatingRequests(1, 500).lift(bulkOperator).test();
        test.assertValueCount(1);
        test.assertValueAt(0, this::isNonBulkRequest);
        bulkOperator.flushActive.set(true);
        bulkOperator.flush();
        test.assertValueCount(6);
        IntStream.range(1, 6).forEach(i -> {
            test.assertValueAt(i, searchRequest -> {
                return isBulkRequest(searchRequest) && ((BulkRequest) searchRequest).getRequests().size() == 100;
            });
        });
    }

    private boolean isBulkRequest(SearchRequest searchRequest) {
        return searchRequest instanceof BulkRequest;
    }

    private boolean isNonBulkRequest(SearchRequest searchRequest) {
        return searchRequest == this.nonBulkable;
    }

    static {
        System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
    }
}
