package de.rcenvironment.core.communication.uplink.client.session.impl;

import de.rcenvironment.core.communication.uplink.client.session.api.UplinkConnection;
import de.rcenvironment.core.communication.uplink.relay.api.ServerSideUplinkSession;
import de.rcenvironment.core.communication.uplink.relay.api.ServerSideUplinkSessionService;
import de.rcenvironment.core.utils.common.testutils.ThroughputLimiter;
import de.rcenvironment.core.utils.common.testutils.ThroughputLimitingInputStream;
import de.rcenvironment.core.utils.common.testutils.ThroughputLimitingOutputStream;
import de.rcenvironment.core.utils.testing.LocalTCPTestConnection;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncTaskService;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:de/rcenvironment/core/communication/uplink/client/session/impl/LocalServiceUplinkConnectionImpl.class */
public class LocalServiceUplinkConnectionImpl implements UplinkConnection {
    private final AsyncTaskService asyncTaskService;
    private final ServerSideUplinkSessionService sessionService;
    private LocalTCPTestConnection localTestConnection;
    private InputStream clientSideInputStream;
    private OutputStream serverSideOutputStream;
    private InputStream serverSideInputStream;
    private OutputStream clientSideOutputStream;
    private final AtomicInteger simulatedConnectionCounter = new AtomicInteger();
    private ThroughputLimiter outgoingThroughputLimiter;
    private ThroughputLimiter incomingThroughputLimiter;

    public LocalServiceUplinkConnectionImpl(AsyncTaskService asyncTaskService, ServerSideUplinkSessionService serverSideUplinkSessionService, ThroughputLimiter throughputLimiter, ThroughputLimiter throughputLimiter2) {
        this.asyncTaskService = asyncTaskService;
        this.sessionService = serverSideUplinkSessionService;
        this.outgoingThroughputLimiter = throughputLimiter;
        this.incomingThroughputLimiter = throughputLimiter2;
    }

    @Override // de.rcenvironment.core.communication.uplink.client.session.api.UplinkConnection
    public void open(Consumer<String> consumer) throws IOException {
        this.localTestConnection = new LocalTCPTestConnection();
        this.clientSideInputStream = this.localTestConnection.getClientSideInputStream();
        this.clientSideOutputStream = this.localTestConnection.getClientSideOutputStream();
        this.serverSideOutputStream = this.localTestConnection.getServerSideOutputStream();
        this.serverSideInputStream = this.localTestConnection.getServerSideInputStream();
        if (this.outgoingThroughputLimiter != null) {
            this.clientSideOutputStream = new ThroughputLimitingOutputStream(this.clientSideOutputStream, this.outgoingThroughputLimiter);
        }
        if (this.incomingThroughputLimiter != null) {
            this.clientSideInputStream = new ThroughputLimitingInputStream(this.clientSideInputStream, this.incomingThroughputLimiter);
        }
        ServerSideUplinkSession createServerSideSession = this.sessionService.createServerSideSession(this.localTestConnection.getServerSideEndpoint(), "test", "local test connection #" + Integer.toString(this.simulatedConnectionCounter.incrementAndGet()));
        AsyncTaskService asyncTaskService = this.asyncTaskService;
        createServerSideSession.getClass();
        asyncTaskService.execute("Local service uplink connection: Running the server-side session", createServerSideSession::runSession);
    }

    public InputStream getInputStream() {
        return this.clientSideInputStream;
    }

    public OutputStream getOutputStream() {
        return this.clientSideOutputStream;
    }

    public void close() {
        this.localTestConnection.getClientSideEndpoint().close();
    }

    public void simulateClientSideEOF() {
        try {
            this.clientSideInputStream.close();
        } catch (IOException e) {
            LogFactory.getLog(getClass()).debug("Caught an exception while closing the C->R stream to simulate EOF: " + e.toString());
        }
    }
}
