package de.rcenvironment.core.communication.uplink.network.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.rcenvironment.core.communication.uplink.common.internal.MessageType;
import de.rcenvironment.core.communication.uplink.common.internal.UplinkProtocolMessageConverter;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.JsonUtils;
import de.rcenvironment.core.utils.common.LogUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.exception.ProtocolException;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/uplink/network/internal/CommonUplinkLowLevelProtocolWrapper.class */
public abstract class CommonUplinkLowLevelProtocolWrapper {
    private static final int HANDSHAKE_MESSAGE_TIMEOUT = 2000;
    private static final int HANDSHAKE_MESSAGE_WAIT_CHECK_INTERVAL = 100;
    protected DataInputStream dataInputStream;
    protected DataOutputStream dataOutputStream;
    protected final UplinkConnectionLowLevelEventHandler eventHandler;
    protected final UplinkProtocolMessageConverter messageConverter;
    private boolean outgoingStreamClosed;
    protected final boolean verboseLoggingEnabled = DebugSettings.getVerboseLoggingEnabled("uplink.lowlevel");
    protected final Log log = LogFactory.getLog(getClass());
    protected final ObjectMapper jsonMapper = JsonUtils.getDefaultObjectMapper();

    public CommonUplinkLowLevelProtocolWrapper(UplinkConnectionLowLevelEventHandler uplinkConnectionLowLevelEventHandler, String str) {
        this.eventHandler = uplinkConnectionLowLevelEventHandler;
        this.messageConverter = new UplinkProtocolMessageConverter(str);
    }

    public abstract void runSession() throws IOException;

    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable, java.io.DataOutputStream] */
    public final void sendMessageBlock(long j, MessageBlock messageBlock) throws IOException {
        byte[] data = messageBlock.getData();
        synchronized (this.dataOutputStream) {
            if (this.outgoingStreamClosed) {
                this.log.debug("Ignoring message send request as the connection has been shut down");
                return;
            }
            if (this.verboseLoggingEnabled) {
                this.log.debug(StringUtils.format("Sending a message of type %s to channel %d, payload size %d bytes", new Object[]{messageBlock.getType(), Long.valueOf(j), Integer.valueOf(data.length)}));
            }
            this.dataOutputStream.writeLong(j);
            this.dataOutputStream.writeInt(data.length);
            this.dataOutputStream.writeByte(messageBlock.getType().getCode());
            this.dataOutputStream.write(data);
            this.dataOutputStream.flush();
        }
    }

    public final void sendMessageBlock(long j, int i, byte[] bArr) throws IOException {
        sendMessageBlock(j, new MessageBlock(i, bArr));
    }

    public abstract void closeOutgoingMessageStream();

    protected byte[] readExpectedBytesWithTimeout(int i, int i2, int i3) throws IOException {
        byte[] bArr = new byte[i];
        long currentTimeMillis = System.currentTimeMillis();
        while (this.dataInputStream.available() < i) {
            if (System.currentTimeMillis() >= currentTimeMillis + i2) {
                throw new ProtocolException("Expected " + i + " bytes of data, but did not receive them within " + i2 + " msec");
            }
            try {
                Thread.sleep(i3);
            } catch (InterruptedException unused) {
                Thread.currentThread().interrupt();
                throw new ProtocolException("Interrupted while waiting for " + i + " bytes of data");
            }
        }
        this.dataInputStream.readFully(bArr);
        return bArr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeInit() throws IOException {
        byte[] bytes = "INIT v0 ".getBytes(UplinkProtocolConstants.DEFAULT_CHARSET);
        if (bytes.length != 8) {
            throw new ProtocolException("Handshake array length does not match the expected byte count");
        }
        this.dataOutputStream.write(bytes);
        if (this.verboseLoggingEnabled) {
            this.log.debug("Sent handshake init (" + bytes.length + " bytes)");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void expectHandshakeInit() throws IOException {
        byte[] readExpectedBytesWithTimeout = readExpectedBytesWithTimeout(8, 2000, HANDSHAKE_MESSAGE_WAIT_CHECK_INTERVAL);
        String str = new String(readExpectedBytesWithTimeout, UplinkProtocolConstants.DEFAULT_CHARSET);
        if (!str.equals("INIT v0 ")) {
            throw new ProtocolException("Received invalid handshake init: " + str);
        }
        if (this.verboseLoggingEnabled) {
            this.log.debug("Received expected handshake init (" + readExpectedBytesWithTimeout.length + " bytes)");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeData(MessageBlock messageBlock) throws IOException {
        sendMessageBlock(0L, messageBlock);
        this.dataOutputStream.flush();
        if (this.verboseLoggingEnabled) {
            this.log.debug("Sent handshake data");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeConfirmation() throws IOException {
        sendMessageBlock(0L, new MessageBlock(MessageType.HANDSHAKE));
        this.dataOutputStream.flush();
        if (this.verboseLoggingEnabled) {
            this.log.debug("Sent handshake confirmation");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageBlock expectHandshakeData() throws IOException, UplinkConnectionRefusedException {
        long readChannelId = readChannelId();
        if (readChannelId != 0) {
            throw new ProtocolException("Unexpected handshake channel id: " + readChannelId);
        }
        MessageBlock readMessageBlockWithTimeout = readMessageBlockWithTimeout(2000);
        if (readMessageBlockWithTimeout.getType() == MessageType.GOODBYE) {
            String extractGoodbyeErrorMessage = extractGoodbyeErrorMessage(readMessageBlockWithTimeout, true);
            throw new UplinkConnectionRefusedException(UplinkProtocolErrorType.typeOfWrappedErrorMessage(extractGoodbyeErrorMessage), UplinkProtocolErrorType.unwrapErrorMessage(extractGoodbyeErrorMessage));
        }
        if (readMessageBlockWithTimeout.getType() != MessageType.HANDSHAKE) {
            throw new ProtocolException("Expected handshake data, but received message type " + readMessageBlockWithTimeout.getType() + " instead");
        }
        if (this.verboseLoggingEnabled) {
            this.log.debug("Received handshake data: " + new String(readMessageBlockWithTimeout.getData(), UplinkProtocolConstants.DEFAULT_CHARSET));
        }
        return readMessageBlockWithTimeout;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.io.DataInputStream] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5, types: [long] */
    protected final long readChannelId() throws IOException {
        ?? r0 = this.dataInputStream;
        synchronized (r0) {
            r0 = this.dataInputStream.readLong();
        }
        return r0;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.io.DataInputStream] */
    protected final MessageBlock readMessageBlock() throws IOException {
        MessageBlock messageBlock;
        synchronized (this.dataInputStream) {
            int readInt = this.dataInputStream.readInt();
            if (readInt < 0 || readInt > 262144) {
                throw new ProtocolException(StringUtils.format("Incoming message block announced a size of %d (valid range: 0-%d)", new Object[]{Integer.valueOf(readInt), Integer.valueOf(UplinkProtocolConstants.MAX_MESSAGE_BLOCK_DATA_LENGTH)}));
            }
            byte readByte = this.dataInputStream.readByte();
            byte[] bArr = new byte[readInt];
            this.dataInputStream.readFully(bArr);
            messageBlock = new MessageBlock(readByte, bArr);
        }
        return messageBlock;
    }

    protected final MessageBlock readMessageBlockWithTimeout(int i) throws IOException {
        CompletableFuture completableFuture = new CompletableFuture();
        ConcurrencyUtils.getAsyncTaskService().execute("Uplink: Read message block with timeout", () -> {
            try {
                completableFuture.complete(readMessageBlock());
            } catch (IOException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        try {
            return (MessageBlock) completableFuture.get(i, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new IOException("Error while waiting for an incoming message: " + e.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.io.DataOutputStream] */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.io.DataOutputStream] */
    /* JADX WARN: Type inference failed for: r0v15, types: [de.rcenvironment.core.communication.uplink.network.internal.CommonUplinkLowLevelProtocolWrapper] */
    /* JADX WARN: Type inference failed for: r0v16 */
    /* JADX WARN: Type inference failed for: r0v17 */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v8 */
    public final void closeOutgoingDataStream() {
        ?? r0 = this.dataOutputStream;
        synchronized (r0) {
            r0 = this.outgoingStreamClosed;
            if (r0 != 0) {
                return;
            }
            try {
                r0 = this;
                r0.sendMessageBlock(0L, 127, new byte[0]);
                r0 = r0;
            } catch (ProtocolException e) {
                throw new RuntimeException("Internal error: Failed to construct shutdown message", e);
            } catch (IOException unused) {
                Log log = this.log;
                log.debug("Failed to send goodbye message; most likely, the connection has already failed");
                r0 = log;
            }
            try {
                r0 = this.dataOutputStream;
                r0.close();
            } catch (IOException unused2) {
                this.log.debug("Failed to actively close the output stream; most likely, the connection has already failed");
            }
            this.outgoingStreamClosed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void runMessageReceiveLoop() {
        if (this.verboseLoggingEnabled) {
            this.log.debug("Running message dispatch loop");
        }
        boolean z = true;
        while (z) {
            try {
                if (!receiveNextMessage()) {
                    z = false;
                }
            } catch (IOException e) {
                String logExceptionAsSingleLineAndAssignUniqueMarker = LogUtils.logExceptionAsSingleLineAndAssignUniqueMarker(this.log, "Error while receiving a message, closing the connection", e);
                this.eventHandler.onNonProtocolError(e);
                attemptToSendErrorGoodbyeMessage(UplinkProtocolErrorType.INTERNAL_SERVER_ERROR, "Closing the connection after an error (internal error log marker " + logExceptionAsSingleLineAndAssignUniqueMarker + ")");
                z = false;
            }
        }
    }

    private boolean receiveNextMessage() throws IOException {
        long readChannelId = readChannelId();
        MessageBlock readMessageBlock = readMessageBlock();
        if (readMessageBlock.getType() != MessageType.GOODBYE) {
            if (this.verboseLoggingEnabled) {
                this.log.debug(StringUtils.format("Received message of type %s for channel %d, payload size %d bytes", new Object[]{readMessageBlock.getType(), Long.valueOf(readChannelId), Integer.valueOf(readMessageBlock.getDataLength())}));
            }
            this.eventHandler.onMessageBlock(readChannelId, readMessageBlock);
            return true;
        }
        this.log.debug("Received 'goodbye' message, stopping message listener");
        if (readMessageBlock.getDataLength() == 0) {
            this.eventHandler.onRegularGoodbyeMessage();
            return false;
        }
        String extractGoodbyeErrorMessage = extractGoodbyeErrorMessage(readMessageBlock, false);
        this.eventHandler.onErrorGoodbyeMessage(UplinkProtocolErrorType.typeOfWrappedErrorMessage(extractGoodbyeErrorMessage), UplinkProtocolErrorType.unwrapErrorMessage(extractGoodbyeErrorMessage));
        return false;
    }

    private String extractGoodbyeErrorMessage(MessageBlock messageBlock, boolean z) {
        String str;
        if (messageBlock.getDataLength() == 0 && z) {
            return "E99: <no error message available>";
        }
        try {
            str = new String(messageBlock.getData());
        } catch (RuntimeException unused) {
            str = "Failed to decode the error string attached to a 'goodbye' message; byte length: " + messageBlock.getDataLength();
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void attemptToSendErrorGoodbyeMessage(UplinkProtocolErrorType uplinkProtocolErrorType, String str) {
        String wrapErrorMessage = uplinkProtocolErrorType.wrapErrorMessage(str);
        try {
            sendMessageBlock(0L, this.messageConverter.encodeErrorGoodbyeMessage(wrapErrorMessage));
        } catch (IOException e) {
            this.log.debug(StringUtils.format("Failed to send a 'goodbye' error message; this is often a best-effort attempt, so this can typically be ignored (message body: %s; error while sending: %s)", new Object[]{wrapErrorMessage, e.toString()}));
        }
    }

    protected boolean isOutgoingStreamClosed() {
        return this.outgoingStreamClosed;
    }
}
