package de.rcenvironment.core.communication.connection.internal;

import de.rcenvironment.core.communication.api.NodeIdentifierService;
import de.rcenvironment.core.communication.channel.MessageChannelLifecycleListener;
import de.rcenvironment.core.communication.channel.MessageChannelService;
import de.rcenvironment.core.communication.channel.MessageChannelState;
import de.rcenvironment.core.communication.channel.MessageChannelTrafficListener;
import de.rcenvironment.core.communication.channel.ServerContactPoint;
import de.rcenvironment.core.communication.common.CommunicationException;
import de.rcenvironment.core.communication.common.IdentifierException;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.common.NodeIdentifierContextHolder;
import de.rcenvironment.core.communication.common.NodeIdentifierUtils;
import de.rcenvironment.core.communication.common.SerializationException;
import de.rcenvironment.core.communication.configuration.CommunicationConfiguration;
import de.rcenvironment.core.communication.configuration.CommunicationIPFilterConfiguration;
import de.rcenvironment.core.communication.configuration.IPWhitelistConnectionFilter;
import de.rcenvironment.core.communication.configuration.NodeConfigurationService;
import de.rcenvironment.core.communication.messaging.MessageEndpointHandler;
import de.rcenvironment.core.communication.messaging.NetworkRequestHandler;
import de.rcenvironment.core.communication.model.InitialNodeInformation;
import de.rcenvironment.core.communication.model.NetworkContactPoint;
import de.rcenvironment.core.communication.model.NetworkRequest;
import de.rcenvironment.core.communication.model.NetworkResponse;
import de.rcenvironment.core.communication.model.NetworkResponseHandler;
import de.rcenvironment.core.communication.protocol.NetworkRequestFactory;
import de.rcenvironment.core.communication.protocol.NetworkResponseFactory;
import de.rcenvironment.core.communication.protocol.ProtocolConstants;
import de.rcenvironment.core.communication.routing.MessageRoutingService;
import de.rcenvironment.core.communication.routing.internal.WaitForResponseBlocker;
import de.rcenvironment.core.communication.transport.spi.BrokenMessageChannelListener;
import de.rcenvironment.core.communication.transport.spi.MessageChannel;
import de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler;
import de.rcenvironment.core.communication.transport.spi.MessageChannelResponseHandler;
import de.rcenvironment.core.communication.transport.spi.NetworkTransportProvider;
import de.rcenvironment.core.communication.utils.MessageUtils;
import de.rcenvironment.core.configuration.bootstrap.RuntimeDetection;
import de.rcenvironment.core.eventlog.api.EventLog;
import de.rcenvironment.core.eventlog.api.EventType;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.toolkitbridge.transitional.StatsCounter;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.textstream.TextOutputReceiver;
import de.rcenvironment.core.utils.incubator.DebugSettings;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncCallback;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncCallbackExceptionPolicy;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncOrderedCallbackManager;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/connection/internal/MessageChannelServiceImpl.class */
public class MessageChannelServiceImpl implements MessageChannelService {
    private static final String SINGLE_QUOTE = "'";
    private InitialNodeInformation ownNodeInformation;
    private NodeConfigurationService configurationService;
    private NodeIdentifierService nodeIdentifierService;
    private MessageRoutingService messageRoutingService;
    private MessageEndpointHandler messageEndpointHandler;
    private boolean localNodeIsRelay;
    private InstanceNodeSessionId localNodeId;
    private long requestTimeoutMsec;
    private AsyncTaskService threadPool = ConcurrencyUtils.getAsyncTaskService();
    private String protocolVersion = "10.0.0";
    private final Map<MessageChannel, MessageChannelHealthState> connectionHealthStates = Collections.synchronizedMap(new WeakHashMap());
    private final AtomicLong healthCheckTaskCounter = new AtomicLong();
    private final boolean verboseLogging = DebugSettings.getVerboseLoggingEnabled(getClass());
    private final Log logger = LogFactory.getLog(getClass());
    private volatile boolean shuttingDown = false;
    private Map<String, NetworkTransportProvider> transportProviders = new HashMap();
    private AsyncOrderedCallbackManager<MessageChannelLifecycleListener> channelListeners = ConcurrencyUtils.getFactory().createAsyncOrderedCallbackManager(AsyncCallbackExceptionPolicy.LOG_AND_CANCEL_LISTENER);
    private AsyncOrderedCallbackManager<MessageChannelTrafficListener> trafficListeners = ConcurrencyUtils.getFactory().createAsyncOrderedCallbackManager(AsyncCallbackExceptionPolicy.LOG_AND_CANCEL_LISTENER);
    private RawMessageChannelEndpointHandlerImpl rawMessageChannelEndpointHandler = new RawMessageChannelEndpointHandlerImpl(this, null);
    private final BrokenMessageChannelListenerImpl brokenConnectionListener = new BrokenMessageChannelListenerImpl(this, null);
    private final Map<String, MessageChannel> activeOutgoingChannels = new HashMap();
    private final IPWhitelistConnectionFilter globalIPWhitelistFilter = new IPWhitelistConnectionFilter();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/communication/connection/internal/MessageChannelServiceImpl$BrokenMessageChannelListenerImpl.class */
    public class BrokenMessageChannelListenerImpl implements BrokenMessageChannelListener {
        private BrokenMessageChannelListenerImpl() {
        }

        @Override // de.rcenvironment.core.communication.transport.spi.BrokenMessageChannelListener
        public void onChannelBroken(MessageChannel messageChannel) {
            if (messageChannel.getInitiatedByRemote()) {
                MessageChannelServiceImpl.this.logger.warn("onChannelBroken called for remote-initiated channel " + messageChannel.getChannelId() + "; ignoring");
            } else {
                MessageChannelServiceImpl.this.handleBrokenChannel(messageChannel);
            }
        }

        /* synthetic */ BrokenMessageChannelListenerImpl(MessageChannelServiceImpl messageChannelServiceImpl, BrokenMessageChannelListenerImpl brokenMessageChannelListenerImpl) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/communication/connection/internal/MessageChannelServiceImpl$MessageChannelHealthState.class */
    public static final class MessageChannelHealthState {
        private int healthCheckFailureCount;
        private final Object healthCheckInProgressLock;

        private MessageChannelHealthState() {
            this.healthCheckInProgressLock = new Object();
        }

        public boolean healthCheckFailuresAtOrAboveLimit() {
            return this.healthCheckFailureCount >= 3;
        }

        /* synthetic */ MessageChannelHealthState(MessageChannelHealthState messageChannelHealthState) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:de/rcenvironment/core/communication/connection/internal/MessageChannelServiceImpl$RawMessageChannelEndpointHandlerImpl.class */
    public class RawMessageChannelEndpointHandlerImpl implements MessageChannelEndpointHandler {
        private RawMessageChannelEndpointHandlerImpl() {
        }

        @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler
        public InitialNodeInformation exchangeNodeInformation(InitialNodeInformation initialNodeInformation) {
            MessageChannelServiceImpl.this.mergeRemoteHandshakeInformationIntoGlobalNodeKnowledge(initialNodeInformation);
            return MessageChannelServiceImpl.this.ownNodeInformation;
        }

        @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler
        public void onRemoteInitiatedChannelEstablished(MessageChannel messageChannel, ServerContactPoint serverContactPoint) {
            if (!messageChannel.getInitiatedByRemote()) {
                throw new IllegalStateException("Consistency error");
            }
            MessageChannelServiceImpl.this.registerNewOutgoingChannel(messageChannel);
            MessageChannelServiceImpl.this.logger.debug(StringUtils.format("Remote-initiated channel '%s' established from '%s' to '%s' via local SCP %s", new Object[]{messageChannel, messageChannel.getRemoteNodeInformation().getLogDescription(), MessageChannelServiceImpl.this.ownNodeInformation.getLogDescription(), serverContactPoint}));
            EventLog.append(EventLog.newEntry(EventType.CONNECTION_INCOMING_ACCEPTED).set("type", "localnet").set("connection_id", messageChannel.getChannelId()).set("remote_node_id", messageChannel.getRemoteNodeInformation().getInstanceNodeSessionIdString()).set("remote_ip", "TODO").set("remote_port", "TODO").set("server_port", serverContactPoint.getNetworkContactPoint().getPort()));
        }

        @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler
        public void onInboundChannelClosing(String str) {
            if (str == null) {
                throw new NullPointerException(str);
            }
            MessageChannelServiceImpl.this.logger.debug("Inbound message channel is closing, checking for mirror channels; id=" + str);
            for (MessageChannel messageChannel : MessageChannelServiceImpl.this.getAllOutgoingChannels()) {
                if (str.equals(messageChannel.getAssociatedMirrorChannelId())) {
                    MessageChannelServiceImpl.this.logger.debug(StringUtils.format("Found matching mirror channel for closing inbound channel %s, closing: %s", new Object[]{str, messageChannel}));
                    MessageChannelServiceImpl.this.threadPool.execute("Communication Layer: Close mirror channel after inbound channel was closed", str, () -> {
                        if (messageChannel.getState() == MessageChannelState.ESTABLISHED) {
                            messageChannel.markAsClosedBecauseMirrorChannelClosed();
                        }
                        MessageChannelServiceImpl.this.closeOutgoingChannel(messageChannel);
                    });
                }
            }
        }

        @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelEndpointHandler
        public NetworkResponse onRawRequestReceived(NetworkRequest networkRequest, String str) {
            try {
                NodeIdentifierContextHolder.setDeserializationServiceForCurrentThread(MessageChannelServiceImpl.this.nodeIdentifierService);
                NetworkResponse onRawRequestReceivedInternal = onRawRequestReceivedInternal(networkRequest, str);
                NodeIdentifierContextHolder.setDeserializationServiceForCurrentThread(null);
                return onRawRequestReceivedInternal;
            } catch (Throwable th) {
                NodeIdentifierContextHolder.setDeserializationServiceForCurrentThread(null);
                throw th;
            }
        }

        private NetworkResponse onRawRequestReceivedInternal(final NetworkRequest networkRequest, String str) {
            NetworkResponse onRequestArrivedAtDestination;
            try {
                final InstanceNodeSessionId parseInstanceNodeSessionIdString = MessageChannelServiceImpl.this.nodeIdentifierService.parseInstanceNodeSessionIdString(str);
                MessageChannelServiceImpl.this.trafficListeners.enqueueCallback(new AsyncCallback<MessageChannelTrafficListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.RawMessageChannelEndpointHandlerImpl.1
                    public void performCallback(MessageChannelTrafficListener messageChannelTrafficListener) {
                        messageChannelTrafficListener.onRequestReceivedFromChannel(networkRequest, parseInstanceNodeSessionIdString);
                    }
                });
                String messageType = networkRequest.getMessageType();
                if (networkRequest.getContentBytes() != null) {
                    StatsCounter.registerValue("Messaging: Request payload bytes received by type", messageType, r0.length);
                }
                String finalRecipientIdString = networkRequest.accessMetaData().getFinalRecipientIdString();
                if (finalRecipientIdString == null || MessageChannelServiceImpl.this.ownNodeInformation.getInstanceNodeSessionIdString().equals(finalRecipientIdString)) {
                    StatsCounter.count("Messages arrived at destination by type", messageType);
                    onRequestArrivedAtDestination = MessageChannelServiceImpl.this.messageEndpointHandler.onRequestArrivedAtDestination(networkRequest);
                } else {
                    if (!MessageChannelServiceImpl.this.localNodeIsRelay) {
                        MessageChannelServiceImpl.this.logger.error("Received a network request that would be forwarded, but the local node is not a relay: " + networkRequest);
                        return NetworkResponseFactory.generateResponseForNoRouteWhileForwarding(networkRequest, MessageChannelServiceImpl.this.ownNodeInformation.getInstanceNodeSessionId());
                    }
                    NetworkRequest createNetworkRequestForForwarding = NetworkRequestFactory.createNetworkRequestForForwarding(networkRequest, MessageChannelServiceImpl.this.ownNodeInformation.getInstanceNodeSessionId());
                    if (!createNetworkRequestForForwarding.getRequestId().equals(networkRequest.getRequestId())) {
                        throw new IllegalStateException("Wrong request id on forwarding");
                    }
                    StatsCounter.count("Messages forwarded by type", messageType);
                    onRequestArrivedAtDestination = MessageChannelServiceImpl.this.messageRoutingService.forwardAndAwait(createNetworkRequestForForwarding);
                }
                if (!onRequestArrivedAtDestination.accessMetaData().hasSender()) {
                    onRequestArrivedAtDestination.accessMetaData().setSender(MessageChannelServiceImpl.this.ownNodeInformation.getInstanceNodeSessionId());
                }
                final NetworkResponse networkResponse = onRequestArrivedAtDestination;
                MessageChannelServiceImpl.this.trafficListeners.enqueueCallback(new AsyncCallback<MessageChannelTrafficListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.RawMessageChannelEndpointHandlerImpl.2
                    public void performCallback(MessageChannelTrafficListener messageChannelTrafficListener) {
                        messageChannelTrafficListener.onResponseSentIntoChannel(networkResponse, networkRequest, parseInstanceNodeSessionIdString);
                    }
                });
                return onRequestArrivedAtDestination;
            } catch (IdentifierException e) {
                throw NodeIdentifierUtils.wrapIdentifierException(e);
            }
        }

        /* synthetic */ RawMessageChannelEndpointHandlerImpl(MessageChannelServiceImpl messageChannelServiceImpl, RawMessageChannelEndpointHandlerImpl rawMessageChannelEndpointHandlerImpl) {
            this();
        }
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public Future<MessageChannel> connect(final NetworkContactPoint networkContactPoint, final boolean z) throws CommunicationException {
        if (this.shuttingDown) {
            throw new CommunicationException("Ignoring a request to connect to " + networkContactPoint + " as the network layer is shutting down");
        }
        final NetworkTransportProvider transportProvider = getTransportProvider(networkContactPoint.getTransportId());
        if (transportProvider == null) {
            throw new CommunicationException("Unknown transport id: " + networkContactPoint.getTransportId());
        }
        return this.threadPool.submit(new Callable<MessageChannel>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            @TaskDescription("Communication Layer: Connect to remote node (low-level task)")
            public MessageChannel call() throws Exception {
                try {
                    MessageChannel connect = transportProvider.connect(networkContactPoint, MessageChannelServiceImpl.this.ownNodeInformation, MessageChannelServiceImpl.this.protocolVersion, z, MessageChannelServiceImpl.this.rawMessageChannelEndpointHandler, MessageChannelServiceImpl.this.brokenConnectionListener);
                    InitialNodeInformation remoteNodeInformation = connect.getRemoteNodeInformation();
                    MessageChannelServiceImpl.this.mergeRemoteHandshakeInformationIntoGlobalNodeKnowledge(remoteNodeInformation);
                    MessageChannelServiceImpl.this.logger.debug(StringUtils.format("Channel '%s' established from '%s' to '%s' using remote NCP %s", new Object[]{connect, MessageChannelServiceImpl.this.ownNodeInformation.getLogDescription(), remoteNodeInformation.getLogDescription(), networkContactPoint}));
                    return connect;
                } catch (RuntimeException e) {
                    MessageChannelServiceImpl.this.logger.error("Failed to connect to " + networkContactPoint + " (local node: " + MessageChannelServiceImpl.this.ownNodeInformation.getLogDescription() + ")", e);
                    throw e;
                }
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v4, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v5, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9 */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void registerNewOutgoingChannel(final MessageChannel messageChannel) {
        this.connectionHealthStates.put(messageChannel, new MessageChannelHealthState(null));
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            this.activeOutgoingChannels.put(messageChannel.getChannelId(), messageChannel);
            r0 = r0;
            this.channelListeners.enqueueCallback(new AsyncCallback<MessageChannelLifecycleListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.2
                public void performCallback(MessageChannelLifecycleListener messageChannelLifecycleListener) {
                    messageChannelLifecycleListener.onOutgoingChannelEstablished(messageChannel);
                }
            });
        }
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void closeOutgoingChannel(MessageChannel messageChannel) {
        if (messageChannel.close()) {
            unregisterClosedOrBrokenChannel(messageChannel);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4 */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public Set<MessageChannel> getAllOutgoingChannels() {
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            HashSet hashSet = new HashSet(this.activeOutgoingChannels.values());
            r0 = r0;
            return hashSet;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v6, types: [de.rcenvironment.core.communication.transport.spi.MessageChannel] */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public MessageChannel getOutgoingChannelById(String str) {
        MessageChannel messageChannel = this.activeOutgoingChannels;
        synchronized (messageChannel) {
            messageChannel = this.activeOutgoingChannels.get(str);
        }
        return messageChannel;
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void setShutdownFlag(boolean z) {
        this.shuttingDown = z;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v11, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v18 */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4 */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void closeAllOutgoingChannels() {
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            HashSet hashSet = new HashSet(this.activeOutgoingChannels.values());
            r0 = r0;
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                closeOutgoingChannel((MessageChannel) it.next());
            }
            ?? r02 = this.activeOutgoingChannels;
            synchronized (r02) {
                Iterator<MessageChannel> it2 = this.activeOutgoingChannels.values().iterator();
                while (it2.hasNext()) {
                    this.logger.warn("Channel list not empty after closing all outgoing connections: " + it2.next());
                }
                r02 = r02;
            }
        }
    }

    @Override // de.rcenvironment.core.communication.messaging.direct.api.DirectMessagingSender
    public void sendDirectMessageAsync(NetworkRequest networkRequest, MessageChannel messageChannel, NetworkResponseHandler networkResponseHandler) {
        sendDirectMessageAsync(networkRequest, messageChannel, networkResponseHandler, this.configurationService.getRequestTimeoutMsec());
    }

    @Override // de.rcenvironment.core.communication.messaging.direct.api.DirectMessagingSender
    public void sendDirectMessageAsync(final NetworkRequest networkRequest, MessageChannel messageChannel, final NetworkResponseHandler networkResponseHandler, int i) {
        if (messageChannel == null) {
            throw new NullPointerException("Null channel passed to sendRequest(); request=" + networkRequest);
        }
        byte[] contentBytes = networkRequest.getContentBytes();
        final String messageType = networkRequest.getMessageType();
        MessageChannelResponseHandler messageChannelResponseHandler = new MessageChannelResponseHandler() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.3
            @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelResponseHandler
            public void onResponseAvailable(NetworkResponse networkResponse) {
                networkResponseHandler.onResponseAvailable(networkResponse);
                if (networkResponse.getContentBytes() != null) {
                    StatsCounter.registerValue("Messaging: Response payload bytes received by type", messageType, r0.length);
                }
            }

            @Override // de.rcenvironment.core.communication.transport.spi.MessageChannelResponseHandler
            public void onChannelBroken(NetworkRequest networkRequest2, MessageChannel messageChannel2) {
                networkResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForChannelCloseWhileWaitingForResponse(networkRequest2, MessageChannelServiceImpl.this.ownNodeInformation.getInstanceNodeSessionId(), null));
                MessageChannelServiceImpl.this.handleBrokenChannel(messageChannel2);
            }
        };
        if (networkRequest.accessMetaData().getSenderIdString() == null) {
            this.logger.warn("Sending message of type " + networkRequest.getMessageType() + " with empty 'sender' field");
        }
        messageChannel.sendRequest(networkRequest, messageChannelResponseHandler, i);
        this.trafficListeners.enqueueCallback(new AsyncCallback<MessageChannelTrafficListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.4
            public void performCallback(MessageChannelTrafficListener messageChannelTrafficListener) {
                messageChannelTrafficListener.onRequestSentIntoChannel(networkRequest);
            }
        });
        if (contentBytes != null) {
            StatsCounter.registerValue("Messaging: Request payload bytes sent by type", messageType, contentBytes.length);
        }
    }

    @Override // de.rcenvironment.core.communication.messaging.direct.api.DirectMessagingSender
    public void sendDirectMessageAsync(NetworkRequest networkRequest, String str, NetworkResponseHandler networkResponseHandler) {
        MessageChannel outgoingChannelById = getOutgoingChannelById(str);
        if (outgoingChannelById != null) {
            sendDirectMessageAsync(networkRequest, outgoingChannelById, networkResponseHandler);
        } else {
            this.logger.debug("No message channel for id " + str + "; most likely, it has just been closed and therefore deregistered");
            networkResponseHandler.onResponseAvailable(NetworkResponseFactory.generateResponseForCloseOrBrokenChannelDuringRequestDelivery(networkRequest, this.localNodeId, null));
        }
    }

    @Override // de.rcenvironment.core.communication.messaging.direct.api.DirectMessagingSender
    public NetworkResponse sendDirectMessageBlocking(NetworkRequest networkRequest, MessageChannel messageChannel, int i) {
        WaitForResponseBlocker waitForResponseBlocker = new WaitForResponseBlocker(networkRequest, this.localNodeId);
        sendDirectMessageAsync(networkRequest, messageChannel, waitForResponseBlocker, i);
        return waitForResponseBlocker.await(i);
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public NetworkResponse handleLocalForcedSerializationRPC(NetworkRequest networkRequest, InstanceNodeSessionId instanceNodeSessionId) {
        return this.rawMessageChannelEndpointHandler.onRawRequestReceived(networkRequest, instanceNodeSessionId.getInstanceNodeSessionIdString());
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void registerRequestHandler(String str, NetworkRequestHandler networkRequestHandler) {
        this.messageEndpointHandler.registerRequestHandler(str, networkRequestHandler);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public synchronized void addChannelLifecycleListener(MessageChannelLifecycleListener messageChannelLifecycleListener) {
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            final Set unmodifiableSet = Collections.unmodifiableSet(new HashSet(this.activeOutgoingChannels.values()));
            this.channelListeners.addListenerAndEnqueueCallback(messageChannelLifecycleListener, new AsyncCallback<MessageChannelLifecycleListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.5
                public void performCallback(MessageChannelLifecycleListener messageChannelLifecycleListener2) {
                    messageChannelLifecycleListener2.setInitialMessageChannels(unmodifiableSet);
                }
            });
            r0 = r0;
        }
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void removeChannelLifecycleListener(MessageChannelLifecycleListener messageChannelLifecycleListener) {
        this.channelListeners.removeListener(messageChannelLifecycleListener);
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void addTrafficListener(MessageChannelTrafficListener messageChannelTrafficListener) {
        this.trafficListeners.addListener(messageChannelTrafficListener);
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public ServerContactPoint startServer(NetworkContactPoint networkContactPoint) throws CommunicationException {
        ServerContactPoint serverContactPoint = new ServerContactPoint(getTransportProvider(networkContactPoint.getTransportId()), networkContactPoint, this.protocolVersion, this.rawMessageChannelEndpointHandler, this.globalIPWhitelistFilter);
        serverContactPoint.start();
        return serverContactPoint;
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void loadAndApplyIPFilterConfiguration() {
        CommunicationIPFilterConfiguration iPFilterConfiguration = this.configurationService.getIPFilterConfiguration();
        if (iPFilterConfiguration.getEnabled()) {
            this.globalIPWhitelistFilter.configure(iPFilterConfiguration.getAllowedIPs());
        } else {
            this.globalIPWhitelistFilter.configure(null);
        }
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void printIPFilterInformation(TextOutputReceiver textOutputReceiver) {
        Set<String> acceptedIps = this.globalIPWhitelistFilter.getAcceptedIps();
        if (acceptedIps != null) {
            textOutputReceiver.addOutput(StringUtils.format("IP filtering is ENABLED; incoming connections are restricted to %d source IPs", new Object[]{Integer.valueOf(acceptedIps.size())}));
        } else {
            textOutputReceiver.addOutput(StringUtils.format("IP filtering is DISABLED; all incoming connections are accepted", new Object[0]));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9 */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void triggerHealthCheckForAllChannels() {
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            for (MessageChannel messageChannel : this.activeOutgoingChannels.values()) {
                this.threadPool.execute("Communication Layer: Channel health check", StringUtils.format("%s-%s", new Object[]{messageChannel.getChannelId(), Long.toString(this.healthCheckTaskCounter.incrementAndGet())}), () -> {
                    try {
                        try {
                            Thread.sleep(ThreadLocalRandom.current().nextInt(CommunicationConfiguration.CONNECTION_HEALTH_CHECK_MAX_JITTER_MSEC));
                            performHealthCheckAndActOnResult(messageChannel);
                        } catch (InterruptedException unused) {
                            this.logger.debug("Interrupted while waiting to perform the next connection health check, skipping");
                        }
                    } catch (InterruptedException e) {
                        this.logger.debug("Interruption during channel health check", e);
                    }
                });
            }
            r0 = r0;
        }
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable, java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.NetworkTransportProvider>] */
    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void addNetworkTransportProvider(NetworkTransportProvider networkTransportProvider) {
        this.logger.debug("Registering transport provider for id '" + networkTransportProvider.getTransportId() + SINGLE_QUOTE);
        synchronized (this.transportProviders) {
            String transportId = networkTransportProvider.getTransportId();
            NetworkTransportProvider put = this.transportProviders.put(transportId, networkTransportProvider);
            if (put != null) {
                this.transportProviders.put(transportId, put);
                throw new IllegalStateException("Duplicate transport for id '" + transportId + SINGLE_QUOTE);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable, java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.NetworkTransportProvider>] */
    public void removeNetworkTransportProvider(NetworkTransportProvider networkTransportProvider) {
        this.logger.debug("Unregistering transport provider for id '" + networkTransportProvider.getTransportId() + SINGLE_QUOTE);
        synchronized (this.transportProviders) {
            if (this.transportProviders.remove(networkTransportProvider.getTransportId()) != networkTransportProvider) {
                throw new IllegalStateException("Transport to remove was not actually registered: " + networkTransportProvider);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable, java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.NetworkTransportProvider>] */
    private NetworkTransportProvider getTransportProvider(String str) {
        NetworkTransportProvider networkTransportProvider;
        synchronized (this.transportProviders) {
            networkTransportProvider = this.transportProviders.get(str);
            if (networkTransportProvider == null) {
                throw new IllegalStateException("No transport registered for id " + str);
            }
        }
        return networkTransportProvider;
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void setMessageEndpointHandler(MessageEndpointHandler messageEndpointHandler) {
        this.messageEndpointHandler = messageEndpointHandler;
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void setForwardingService(MessageRoutingService messageRoutingService) {
        this.messageRoutingService = messageRoutingService;
    }

    @Override // de.rcenvironment.core.communication.channel.MessageChannelService
    public void overrideProtocolVersion(String str) {
        this.protocolVersion = str;
    }

    public void bindNodeConfigurationService(NodeConfigurationService nodeConfigurationService) {
        if (this.configurationService != null) {
            throw new IllegalStateException();
        }
        this.configurationService = nodeConfigurationService;
        this.nodeIdentifierService = this.configurationService.getNodeIdentifierService();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.NetworkTransportProvider>] */
    /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable] */
    public void activate() {
        if (RuntimeDetection.isImplicitServiceActivationDenied()) {
            return;
        }
        this.ownNodeInformation = this.configurationService.getInitialNodeInformation();
        this.localNodeId = this.ownNodeInformation.getInstanceNodeSessionId();
        this.localNodeIsRelay = this.configurationService.isRelay();
        this.requestTimeoutMsec = this.configurationService.getRequestTimeoutMsec();
        ?? r0 = this.transportProviders;
        synchronized (r0) {
            this.logger.debug(StringUtils.format("Activated network channel service; instance log name='%s'; node id='%s'; %d registered transport providers", new Object[]{this.ownNodeInformation.getLogDescription(), this.ownNodeInformation.getInstanceNodeSessionId(), Integer.valueOf(this.transportProviders.size())}));
            r0 = r0;
            loadAndApplyIPFilterConfiguration();
        }
    }

    public void deactivate() {
        this.logger.debug("Deactivating");
    }

    protected void setNodeInformation(InitialNodeInformation initialNodeInformation) {
        this.ownNodeInformation = initialNodeInformation;
    }

    public NodeIdentifierService getNodeIdentifierService() {
        return this.nodeIdentifierService;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeRemoteHandshakeInformationIntoGlobalNodeKnowledge(InitialNodeInformation initialNodeInformation) {
        this.nodeIdentifierService.associateDisplayName(initialNodeInformation.getInstanceNodeSessionId(), initialNodeInformation.getDisplayName());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v15 */
    /* JADX WARN: Type inference failed for: r0v16, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v23 */
    /* JADX WARN: Type inference failed for: r0v6, types: [java.lang.Throwable, java.lang.Object] */
    private void performHealthCheckAndActOnResult(MessageChannel messageChannel) throws InterruptedException {
        MessageChannelHealthState messageChannelHealthState = this.connectionHealthStates.get(messageChannel);
        if (messageChannelHealthState == null) {
            this.logger.error("Internal error: Found no health state object for channel " + messageChannel + "; closing channel");
            triggerAsyncClosingOfBrokenChannel(messageChannel);
            return;
        }
        synchronized (messageChannelHealthState.healthCheckInProgressLock) {
            if (!messageChannel.isReadyToUse()) {
                this.logger.debug(StringUtils.format("Channel %s is %s; skipping scheduled health check", new Object[]{messageChannel.getChannelId(), messageChannel.getState()}));
                return;
            }
            if (this.verboseLogging) {
                this.logger.debug("Performing health check on " + messageChannel);
            }
            boolean performConnectionHealthCheckRequestResponse = performConnectionHealthCheckRequestResponse(messageChannel);
            boolean z = false;
            synchronized (messageChannelHealthState) {
                ?? r0 = performConnectionHealthCheckRequestResponse;
                if (r0 != 0) {
                    if (messageChannelHealthState.healthCheckFailureCount > 0) {
                        this.logger.debug(StringUtils.format("Channel %s to %s passed its health check after %d previous failures", new Object[]{messageChannel.getChannelId(), messageChannel.getRemoteNodeInformation().getInstanceNodeSessionId(), Integer.valueOf(messageChannelHealthState.healthCheckFailureCount)}));
                    }
                    messageChannelHealthState.healthCheckFailureCount = 0;
                } else {
                    messageChannelHealthState.healthCheckFailureCount++;
                    this.logger.debug(StringUtils.format("Channel %s to %s failed a health check (%d consecutive failures)", new Object[]{messageChannel.getChannelId(), messageChannel.getRemoteNodeInformation().getInstanceNodeSessionId(), Integer.valueOf(messageChannelHealthState.healthCheckFailureCount)}));
                    if (messageChannelHealthState.healthCheckFailuresAtOrAboveLimit()) {
                        z = true;
                    }
                }
                r0 = messageChannelHealthState;
                if (z) {
                    triggerAsyncClosingOfBrokenChannel(messageChannel);
                }
            }
        }
    }

    private void triggerAsyncClosingOfBrokenChannel(MessageChannel messageChannel) {
        this.threadPool.execute("Communication Layer: Close broken channel after health check failure", () -> {
            handleBrokenChannel(messageChannel);
        });
    }

    private boolean performConnectionHealthCheckRequestResponse(MessageChannel messageChannel) throws InterruptedException {
        String num = Integer.toString(ThreadLocalRandom.current().nextInt());
        NetworkResponse sendDirectMessageBlocking = sendDirectMessageBlocking(NetworkRequestFactory.createNetworkRequest(MessageUtils.serializeSafeObject(num), ProtocolConstants.VALUE_MESSAGE_TYPE_HEALTH_CHECK, this.ownNodeInformation.getInstanceNodeSessionId(), null), messageChannel, CommunicationConfiguration.CONNECTION_HEALTH_CHECK_TIMEOUT_MSEC);
        if (sendDirectMessageBlocking.isSuccess()) {
            try {
                Serializable deserializedContent = sendDirectMessageBlocking.getDeserializedContent();
                if (!num.equals(deserializedContent)) {
                    this.logger.warn(StringUtils.format("Received successful response for a health check on channel '%s', but it contained unexpected content: %s", new Object[]{messageChannel.getChannelId(), deserializedContent}));
                    return false;
                }
                if (!this.verboseLogging) {
                    return true;
                }
                this.logger.debug("Health check on channel " + messageChannel + " passed");
                return true;
            } catch (SerializationException e) {
                this.logger.warn(StringUtils.format("Received successful response for a health check on channel '%s', but there was an error deserializing its content", new Object[]{messageChannel.getChannelId()}), e);
                return false;
            }
        }
        int code = sendDirectMessageBlocking.getResultCode().getCode();
        Serializable serializable = null;
        try {
            serializable = sendDirectMessageBlocking.getDeserializedContent();
        } catch (SerializationException e2) {
            this.logger.warn(StringUtils.format("Received non-successful response for a health check on channel '%s' (error code %d), and there was also an error deserializing its content", new Object[]{messageChannel.getChannelId(), Integer.valueOf(code)}), e2);
        }
        if (sendDirectMessageBlocking.getResultCode() == ProtocolConstants.ResultCode.TIMEOUT_WAITING_FOR_RESPONSE) {
            this.logger.warn(StringUtils.format("Received no response for a health check on message channel '%s' within %,d milliseconds; the connection or the remote instance may be overloaded", new Object[]{messageChannel.getChannelId(), Integer.valueOf(CommunicationConfiguration.CONNECTION_HEALTH_CHECK_TIMEOUT_MSEC)}));
            return false;
        }
        if (sendDirectMessageBlocking.getResultCode() == ProtocolConstants.ResultCode.CHANNEL_OR_RESPONSE_LISTENER_SHUT_DOWN_WHILE_WAITING_FOR_RESPONSE) {
            this.logger.debug(StringUtils.format("Message channel '%s' was closed while waiting for a health check response; not counting as an additional health check failure", new Object[]{messageChannel.getChannelId(), Integer.valueOf(CommunicationConfiguration.CONNECTION_HEALTH_CHECK_TIMEOUT_MSEC)}));
            return true;
        }
        this.logger.warn(StringUtils.format("Received non-success response for a health check on channel '%s': error code %d, content: %s", new Object[]{messageChannel.getChannelId(), Integer.valueOf(code), serializable}));
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleBrokenChannel(MessageChannel messageChannel) {
        this.logger.debug("Closing broken channel to " + (messageChannel.getRemoteNodeInformation() != null ? messageChannel.getRemoteNodeInformation().getInstanceNodeSessionId().toString() : "(no node information available)") + " (id=" + messageChannel.getChannelId() + ")");
        if (messageChannel.markAsBroken()) {
            unregisterClosedOrBrokenChannel(messageChannel);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.util.Map<java.lang.String, de.rcenvironment.core.communication.transport.spi.MessageChannel>] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8 */
    private void unregisterClosedOrBrokenChannel(final MessageChannel messageChannel) {
        ?? r0 = this.activeOutgoingChannels;
        synchronized (r0) {
            MessageChannel remove = this.activeOutgoingChannels.remove(messageChannel.getChannelId());
            if (remove != messageChannel) {
                this.logger.warn(StringUtils.format("Unexpected state: Expected to find same registered channel object for closed or broken channel %s, but found '%s' instead", new Object[]{messageChannel.getChannelId(), remove}));
            }
            r0 = r0;
            this.logger.debug("Notifying listeners of shutdown of channel " + messageChannel.getChannelId());
            this.channelListeners.enqueueCallback(new AsyncCallback<MessageChannelLifecycleListener>() { // from class: de.rcenvironment.core.communication.connection.internal.MessageChannelServiceImpl.6
                public void performCallback(MessageChannelLifecycleListener messageChannelLifecycleListener) {
                    messageChannelLifecycleListener.onOutgoingChannelTerminated(messageChannel);
                }
            });
            EventLog.append(EventLog.newEntry(EventType.CONNECTION_INCOMING_CLOSED).set("type", "localnet").set("connection_id", messageChannel.getChannelId()).set("remote_node_id", messageChannel.getRemoteNodeInformation().getInstanceNodeSessionIdString()).set("remote_ip", "TODO").set("remote_port", "TODO").set("server_port", "TODO"));
        }
    }
}
