package com.gentics.mesh.rest.client.impl;

import com.gentics.mesh.rest.client.EventbusEvent;
import com.gentics.mesh.rest.client.MeshRestClientConfig;
import com.gentics.mesh.rest.client.MeshWebsocket;
import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

/* loaded from: input_file:com/gentics/mesh/rest/client/impl/OkHttpWebsocket.class */
public class OkHttpWebsocket implements MeshWebsocket {
    private final OkHttpClient client;
    private final MeshRestClientConfig config;
    private WebSocket currentConnection;
    private Disposable pingInterval;
    private static final Logger log = LoggerFactory.getLogger(OkHttpWebsocket.class);
    private static final Object connectionDummy = new Object();
    private final Subject<EventbusEvent> events = PublishSubject.create();
    private final Subject<Object> connections = PublishSubject.create();
    private final Subject<Throwable> errors = PublishSubject.create();
    private final Set<String> registeredEventAddresses = Collections.synchronizedSet(new HashSet());
    private AtomicBoolean connected = new AtomicBoolean(false);

    public OkHttpWebsocket(OkHttpClient okHttpClient, MeshRestClientConfig meshRestClientConfig) {
        this.client = okHttpClient;
        this.config = meshRestClientConfig;
        connect();
        startPings();
        this.errors.subscribe(th -> {
            log.error("Error in Websocket", th);
        });
    }

    private void connect() {
        Request build = new Request.Builder().url(this.config.getBaseUrl() + "/eventbus/websocket").build();
        this.connected.set(false);
        if (log.isDebugEnabled()) {
            log.debug("Connecting to {}", new Object[]{build.url()});
        }
        this.currentConnection = this.client.newWebSocket(build, new WebSocketListener() { // from class: com.gentics.mesh.rest.client.impl.OkHttpWebsocket.1
            public void onOpen(WebSocket webSocket, Response response) {
                OkHttpWebsocket.this.connected.set(true);
                OkHttpWebsocket.this.sendRegisterEvents();
                OkHttpWebsocket.log.debug("Connection established, sending connection event");
                OkHttpWebsocket.this.connections.onNext(OkHttpWebsocket.connectionDummy);
            }

            public void onMessage(WebSocket webSocket, String str) {
                OkHttpWebsocket.log.trace("Received message: {}", new Object[]{str});
                try {
                    OkHttpWebsocket.this.events.onNext(new EventbusEvent(str));
                } catch (IOException e) {
                    OkHttpWebsocket.this.errors.onNext(new Exception("Could not parse message from mesh", e));
                }
            }

            public void onMessage(WebSocket webSocket, ByteString byteString) {
                onMessage(webSocket, byteString.utf8());
            }

            public void onClosing(WebSocket webSocket, int i, String str) {
                OkHttpWebsocket.this.errors.onNext(new Exception(String.format("Unexpected closing of socket by peer. Code: %d, reason: %s", Integer.valueOf(i), str)));
                OkHttpWebsocket.this.reconnect();
            }

            public void onClosed(WebSocket webSocket, int i, String str) {
                OkHttpWebsocket.this.errors.onNext(new Exception(String.format("Unexpected closed socket. Code: %d, reason: %s", Integer.valueOf(i), str)));
                OkHttpWebsocket.this.reconnect();
            }

            public void onFailure(WebSocket webSocket, Throwable th, Response response) {
                OkHttpWebsocket.this.errors.onNext(new Exception("Failure in websocket to mesh", th));
                OkHttpWebsocket.this.reconnect();
            }
        });
    }

    private void startPings() {
        this.pingInterval = Observable.interval(this.config.getWebsocketReconnectInterval().toMillis(), TimeUnit.MILLISECONDS).subscribe(l -> {
            send(Util.eventbusMessage(EventbusMessageType.PING));
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect() {
        this.connected.set(false);
        if (log.isDebugEnabled()) {
            log.debug("Attempting reconnect in {}ms", new Object[]{Long.valueOf(this.config.getWebsocketReconnectInterval().toMillis())});
        }
        Completable.complete().delay(this.config.getWebsocketReconnectInterval().toMillis(), TimeUnit.MILLISECONDS).subscribe(this::connect);
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public void close() {
        this.pingInterval.dispose();
        this.events.onComplete();
        this.connections.onComplete();
        this.errors.onComplete();
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public void publishEvent(String str, Object obj) {
        send(Util.eventbusMessage(EventbusMessageType.PUBLISH, str, obj));
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public void registerEvents(String... strArr) {
        this.registeredEventAddresses.addAll(Arrays.asList(strArr));
        sendRegisterEvents();
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public void unregisterEvents(String... strArr) {
        Stream of = Stream.of((Object[]) strArr);
        Set<String> set = this.registeredEventAddresses;
        set.getClass();
        of.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendRegisterEvents() {
        this.registeredEventAddresses.forEach(str -> {
            send(Util.eventbusMessage(EventbusMessageType.REGISTER, str));
        });
    }

    private void send(String str) {
        if (this.connected.get()) {
            this.currentConnection.send(str);
        }
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public Observable<EventbusEvent> events() {
        return this.events;
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public Observable<Object> connections() {
        return this.connections.startWith(Maybe.create(maybeEmitter -> {
            if (this.connected.get()) {
                maybeEmitter.onSuccess(connectionDummy);
            } else {
                maybeEmitter.onComplete();
            }
        }).toObservable());
    }

    @Override // com.gentics.mesh.rest.client.MeshWebsocket
    public Observable<Throwable> errors() {
        return this.errors;
    }
}
