package de.rcenvironment.core.communication.uplink.network.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.rcenvironment.core.communication.uplink.common.internal.MessageType;
import de.rcenvironment.core.communication.uplink.common.internal.UplinkProtocolMessageConverter;
import de.rcenvironment.core.communication.uplink.network.api.MessageBlockPriority;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.JsonUtils;
import de.rcenvironment.core.utils.common.LogUtils;
import de.rcenvironment.core.utils.common.StreamConnectionEndpoint;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.exception.ProtocolException;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/uplink/network/internal/CommonUplinkLowLevelProtocolWrapper.class */
public abstract class CommonUplinkLowLevelProtocolWrapper {
    private static final int HANDSHAKE_MESSAGE_WAIT_CHECK_INTERVAL = 100;
    protected final StreamConnectionEndpoint connectionEndpoint;
    protected final DataInputStream dataInputStream;
    protected final DataOutputStream dataOutputStream;
    protected final UplinkConnectionLowLevelEventHandler eventHandler;
    protected final UplinkProtocolMessageConverter messageConverter;
    protected final String logPrefix;
    protected final boolean verboseLoggingEnabled = DebugSettings.getVerboseLoggingEnabled("uplink.lowlevel");
    protected final Log log = LogFactory.getLog(getClass());
    protected final ObjectMapper jsonMapper = JsonUtils.getDefaultObjectMapper();

    public CommonUplinkLowLevelProtocolWrapper(StreamConnectionEndpoint streamConnectionEndpoint, UplinkConnectionLowLevelEventHandler uplinkConnectionLowLevelEventHandler, String str) {
        this.connectionEndpoint = streamConnectionEndpoint;
        this.dataInputStream = new DataInputStream(streamConnectionEndpoint.getInputStream());
        this.dataOutputStream = new DataOutputStream(streamConnectionEndpoint.getOutputStream());
        this.eventHandler = uplinkConnectionLowLevelEventHandler;
        this.messageConverter = new UplinkProtocolMessageConverter(str);
        this.logPrefix = "[" + str + "] ";
    }

    public void runSession() {
        try {
            runHandshakeSequence();
            this.eventHandler.onHandshakeComplete();
            runMessageReceiveLoop();
        } catch (UplinkConnectionRefusedException e) {
            if (e.shouldAttemptToSendErrorGoodbye()) {
                this.log.debug(String.valueOf(this.logPrefix) + "Uplink handshake failed or connection refused; attempting to send error message \"" + e.getRawMessage() + "\"");
                attemptToSendErrorGoodbyeMessage(e.getType(), e.getRawMessage());
            } else {
                this.log.debug(String.valueOf(this.logPrefix) + "Uplink handshake failed or connection refused: " + e.getRawMessage());
            }
            this.eventHandler.onHandshakeFailedOrConnectionRefused(e);
        }
        terminateSession();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v19 */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.io.DataOutputStream] */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.lang.Throwable] */
    public final void sendMessageBlock(long j, MessageBlock messageBlock) throws IOException {
        byte[] data = messageBlock.getData();
        ?? r0 = this.dataOutputStream;
        synchronized (r0) {
            if (this.verboseLoggingEnabled) {
                this.log.debug(StringUtils.format("%s[//%d] Sending a message of type %s, payload size %d bytes", new Object[]{this.logPrefix, Long.valueOf(j), messageBlock.getType(), Integer.valueOf(data.length)}));
            }
            this.dataOutputStream.writeLong(j);
            this.dataOutputStream.writeInt(data.length);
            this.dataOutputStream.writeByte(messageBlock.getType().getCode());
            this.dataOutputStream.write(data);
            this.dataOutputStream.flush();
            if (this.verboseLoggingEnabled) {
                this.log.debug(StringUtils.format("%s[//%d] Finished sending a message of type %s, payload size %d bytes", new Object[]{this.logPrefix, Long.valueOf(j), messageBlock.getType(), Integer.valueOf(data.length)}));
            }
            r0 = r0;
        }
    }

    public final void sendMessageBlock(long j, int i, byte[] bArr) throws IOException {
        sendMessageBlock(j, new MessageBlock(i, bArr));
    }

    protected byte[] readExpectedBytesWithTimeout(int i, int i2, int i3) throws IOException, TimeoutException {
        byte[] bArr = new byte[i];
        long currentTimeMillis = System.currentTimeMillis();
        while (this.dataInputStream.available() < i) {
            if (System.currentTimeMillis() >= currentTimeMillis + i2) {
                throw new TimeoutException("Expected " + i + " bytes of data, but did not receive them within " + i2 + " msec");
            }
            try {
                Thread.sleep(i3);
            } catch (InterruptedException unused) {
                Thread.currentThread().interrupt();
                throw new ProtocolException("Interrupted while waiting for " + i + " bytes of data");
            }
        }
        this.dataInputStream.readFully(bArr);
        return bArr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeInit() throws IOException {
        byte[] bytes = "INIT v0 ".getBytes(UplinkProtocolConstants.DEFAULT_CHARSET);
        if (bytes.length != 8) {
            throw new ProtocolException("Handshake array length does not match the expected byte count");
        }
        this.dataOutputStream.write(bytes);
        if (this.verboseLoggingEnabled) {
            this.log.debug(String.valueOf(this.logPrefix) + "Sent handshake init (" + bytes.length + " bytes)");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void expectHandshakeInit() throws IOException, TimeoutException {
        byte[] readExpectedBytesWithTimeout = readExpectedBytesWithTimeout(8, UplinkProtocolConfiguration.getCurrent().getHandshakeResponseTimeout(), HANDSHAKE_MESSAGE_WAIT_CHECK_INTERVAL);
        String str = new String(readExpectedBytesWithTimeout, UplinkProtocolConstants.DEFAULT_CHARSET);
        if (!str.equals("INIT v0 ")) {
            throw new ProtocolException("Received invalid handshake init: " + str);
        }
        if (this.verboseLoggingEnabled) {
            this.log.debug(String.valueOf(this.logPrefix) + "Received expected handshake init (" + readExpectedBytesWithTimeout.length + " bytes)");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeData(MessageBlock messageBlock) throws IOException {
        sendMessageBlock(0L, messageBlock);
        this.dataOutputStream.flush();
        if (this.verboseLoggingEnabled) {
            this.log.debug(String.valueOf(this.logPrefix) + "Sent handshake data '" + new String(messageBlock.getData(), UplinkProtocolConstants.DEFAULT_CHARSET) + "'");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendHandshakeConfirmation() throws IOException {
        sendMessageBlock(0L, new MessageBlock(MessageType.HANDSHAKE));
        this.dataOutputStream.flush();
        if (this.verboseLoggingEnabled) {
            this.log.debug(String.valueOf(this.logPrefix) + "Sent handshake confirmation");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageBlock expectHandshakeData() throws UplinkConnectionRefusedException, TimeoutException, IOException {
        try {
            MessageBlockWithMetadata readChannelIdAndMessageBlockWithTimeout = readChannelIdAndMessageBlockWithTimeout(UplinkProtocolConfiguration.getCurrent().getHandshakeResponseTimeout());
            if (readChannelIdAndMessageBlockWithTimeout.getChannelId() != 0) {
                throw new ProtocolException("Unexpected handshake channel id: " + readChannelIdAndMessageBlockWithTimeout.getChannelId());
            }
            if (readChannelIdAndMessageBlockWithTimeout.getType() == MessageType.GOODBYE) {
                String extractGoodbyeErrorMessage = extractGoodbyeErrorMessage(readChannelIdAndMessageBlockWithTimeout, true);
                throw new UplinkConnectionRefusedException(UplinkProtocolErrorType.typeOfWrappedErrorMessage(extractGoodbyeErrorMessage), UplinkProtocolErrorType.unwrapErrorMessage(extractGoodbyeErrorMessage), false);
            }
            if (readChannelIdAndMessageBlockWithTimeout.getType() != MessageType.HANDSHAKE) {
                throw new UplinkConnectionRefusedException(UplinkProtocolErrorType.PROTOCOL_VIOLATION, "Expected handshake data, but received message type " + readChannelIdAndMessageBlockWithTimeout.getType() + " instead", true);
            }
            if (this.verboseLoggingEnabled) {
                if (readChannelIdAndMessageBlockWithTimeout.getDataLength() == 0) {
                    this.log.debug(String.valueOf(this.logPrefix) + "Received handshake confirmation");
                } else {
                    this.log.debug(String.valueOf(this.logPrefix) + "Received handshake data '" + new String(readChannelIdAndMessageBlockWithTimeout.getData(), UplinkProtocolConstants.DEFAULT_CHARSET) + "'");
                }
            }
            return readChannelIdAndMessageBlockWithTimeout;
        } catch (IOException e) {
            throw new IOException("Error receiving the remote side's handshake response: " + e.getMessage());
        } catch (TimeoutException unused) {
            throw new TimeoutException("The remote side did not send their Uplink handshake response within " + UplinkProtocolConfiguration.getCurrent().getHandshakeResponseTimeout() + " msec");
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.io.DataInputStream] */
    protected final MessageBlockWithMetadata readMessageBlock() throws IOException {
        MessageBlockWithMetadata messageBlockWithMetadata;
        synchronized (this.dataInputStream) {
            long readLong = this.dataInputStream.readLong();
            int readInt = this.dataInputStream.readInt();
            if (readInt < 0 || readInt > 262144) {
                throw new ProtocolException(StringUtils.format("Incoming message block announced a size of %d (valid range: 0-%d)", new Object[]{Integer.valueOf(readInt), Integer.valueOf(UplinkProtocolConstants.MAX_MESSAGE_BLOCK_DATA_LENGTH)}));
            }
            byte readByte = this.dataInputStream.readByte();
            byte[] bArr = new byte[readInt];
            this.dataInputStream.readFully(bArr);
            messageBlockWithMetadata = new MessageBlockWithMetadata(readByte, bArr, readLong, MessageBlockPriority.DEFAULT);
        }
        return messageBlockWithMetadata;
    }

    protected final MessageBlockWithMetadata readChannelIdAndMessageBlockWithTimeout(int i) throws IOException, TimeoutException {
        CompletableFuture completableFuture = new CompletableFuture();
        ConcurrencyUtils.getAsyncTaskService().execute("Uplink: Read message block with timeout", () -> {
            try {
                completableFuture.complete(readMessageBlock());
            } catch (IOException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        try {
            return (MessageBlockWithMetadata) completableFuture.get(i, TimeUnit.MILLISECONDS);
        } catch (InterruptedException unused) {
            throw new IOException("Interrupted while waiting for an incoming message");
        } catch (ExecutionException e) {
            throw new IOException("Error while waiting for an incoming message: " + e.getMessage());
        }
    }

    public final void terminateSession() {
        this.connectionEndpoint.close();
    }

    public final boolean attemptToSendRegularGoodbyeMessage() {
        try {
            sendMessageBlock(0L, 127, new byte[0]);
            return true;
        } catch (IOException unused) {
            this.log.debug("Failed to send regular 'goodbye' message; most likely, the connection has already failed");
            return false;
        } catch (ProtocolException e) {
            throw new RuntimeException("Internal error: Failed to construct shutdown message", e);
        }
    }

    public final void attemptToSendErrorGoodbyeMessage(UplinkProtocolErrorType uplinkProtocolErrorType, String str) {
        String wrapErrorMessage = uplinkProtocolErrorType.wrapErrorMessage(str);
        try {
            sendMessageBlock(0L, this.messageConverter.encodeErrorGoodbyeMessage(wrapErrorMessage));
        } catch (IOException e) {
            this.log.debug(StringUtils.format("%sFailed to send a 'goodbye' error message; this is often a best-effort attempt, so this can typically be ignored (message body: %s; error while sending: %s)", new Object[]{this.logPrefix, wrapErrorMessage, e.toString()}));
        }
    }

    protected abstract void runHandshakeSequence() throws UplinkConnectionRefusedException;

    protected final void runMessageReceiveLoop() {
        if (this.verboseLoggingEnabled) {
            this.log.debug(String.valueOf(this.logPrefix) + "Running message dispatch loop");
        }
        boolean z = true;
        while (true) {
            try {
                if (!receiveNextMessage()) {
                    z = false;
                }
            } catch (IOException e) {
                if ((e instanceof EOFException) || (e instanceof SocketException)) {
                    if (e.getClass() != EOFException.class && e.getMessage() == null) {
                        this.log.debug(StringUtils.format("%sCategorizing stream read exception as 'end of stream' event: %s", new Object[]{this.logPrefix, e.toString()}));
                    }
                    this.eventHandler.onIncomingStreamClosedOrEOF();
                    return;
                }
                if (!z) {
                    this.log.error(String.valueOf(this.logPrefix) + "Not expecting further messages, but encountered a non-EOF exception; still considering the stream as closed/broken as a fallback: " + e.toString());
                    this.eventHandler.onIncomingStreamClosedOrEOF();
                    return;
                } else {
                    String logExceptionAsSingleLineAndAssignUniqueMarker = LogUtils.logExceptionAsSingleLineAndAssignUniqueMarker(this.log, String.valueOf(this.logPrefix) + "Error while receiving a message, closing the connection", e);
                    this.eventHandler.onStreamReadError(e);
                    attemptToSendErrorGoodbyeMessage(UplinkProtocolErrorType.INTERNAL_SERVER_ERROR, "Closing the connection after an error (internal error log marker " + logExceptionAsSingleLineAndAssignUniqueMarker + ")");
                    return;
                }
            }
        }
    }

    private boolean receiveNextMessage() throws IOException {
        MessageBlockWithMetadata readMessageBlock = readMessageBlock();
        long channelId = readMessageBlock.getChannelId();
        if (readMessageBlock.getType() != MessageType.GOODBYE) {
            if (this.verboseLoggingEnabled) {
                this.log.debug(StringUtils.format("%sReceived message of type %s for channel %d, payload size %d bytes", new Object[]{this.logPrefix, readMessageBlock.getType(), Long.valueOf(channelId), Integer.valueOf(readMessageBlock.getDataLength())}));
            }
            this.eventHandler.onMessageBlock(channelId, readMessageBlock);
            return true;
        }
        if (readMessageBlock.getDataLength() == 0) {
            this.log.debug(String.valueOf(this.logPrefix) + "Received regular 'goodbye' message, expecting end of stream next");
            this.eventHandler.onRegularGoodbyeMessage();
            return false;
        }
        this.log.debug(String.valueOf(this.logPrefix) + "Received error 'goodbye' message, expecting end of stream next");
        String extractGoodbyeErrorMessage = extractGoodbyeErrorMessage(readMessageBlock, false);
        this.eventHandler.onErrorGoodbyeMessage(UplinkProtocolErrorType.typeOfWrappedErrorMessage(extractGoodbyeErrorMessage), UplinkProtocolErrorType.unwrapErrorMessage(extractGoodbyeErrorMessage));
        return false;
    }

    private String extractGoodbyeErrorMessage(MessageBlock messageBlock, boolean z) {
        String str;
        if (messageBlock.getDataLength() == 0 && z) {
            return "E99: <no error message available>";
        }
        try {
            str = new String(messageBlock.getData());
        } catch (RuntimeException unused) {
            str = "Failed to decode the error string attached to a 'goodbye' message; byte length: " + messageBlock.getDataLength();
        }
        return str;
    }
}
