package de.rcenvironment.core.notification.internal;

import de.rcenvironment.core.communication.api.CommunicationService;
import de.rcenvironment.core.communication.api.PlatformService;
import de.rcenvironment.core.communication.common.InstanceNodeSessionId;
import de.rcenvironment.core.communication.common.ResolvableNodeId;
import de.rcenvironment.core.notification.DistributedNotificationService;
import de.rcenvironment.core.notification.Notification;
import de.rcenvironment.core.notification.NotificationService;
import de.rcenvironment.core.notification.NotificationSubscriber;
import de.rcenvironment.core.notification.api.RemotableNotificationService;
import de.rcenvironment.core.toolkitbridge.transitional.ConcurrencyUtils;
import de.rcenvironment.core.utils.common.ServiceUtils;
import de.rcenvironment.core.utils.common.StringUtils;
import de.rcenvironment.core.utils.common.rpc.RemoteOperationException;
import de.rcenvironment.toolkit.modules.concurrency.api.AsyncExceptionListener;
import de.rcenvironment.toolkit.modules.concurrency.api.CallablesGroup;
import de.rcenvironment.toolkit.modules.concurrency.api.TaskDescription;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;

/* loaded from: input_file:de/rcenvironment/core/notification/internal/DistributedNotificationServiceImpl.class */
public class DistributedNotificationServiceImpl implements DistributedNotificationService {
    private static NotificationService notificationService;
    private static final Log LOGGER = LogFactory.getLog(DistributedNotificationServiceImpl.class);
    private static CommunicationService nullCommunicationService = (CommunicationService) ServiceUtils.createFailingServiceProxy(CommunicationService.class);
    private static CommunicationService communicationService = nullCommunicationService;
    private static PlatformService platformService = (PlatformService) ServiceUtils.createFailingServiceProxy(PlatformService.class);

    protected void activate(BundleContext bundleContext) {
    }

    protected void bindNotificationService(NotificationService notificationService2) {
        notificationService = notificationService2;
    }

    protected void bindCommunicationService(CommunicationService communicationService2) {
        communicationService = communicationService2;
    }

    protected void bindPlatformService(PlatformService platformService2) {
        platformService = platformService2;
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public void setBufferSize(String str, int i) {
        notificationService.setBufferSize(str, i);
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public void removePublisher(String str) {
        notificationService.removePublisher(str);
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public <T extends Serializable> void send(String str, T t) {
        notificationService.send(str, t);
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public Map<String, Long> subscribe(String str, NotificationSubscriber notificationSubscriber, ResolvableNodeId resolvableNodeId) throws RemoteOperationException {
        try {
            Pattern.compile(str);
            if (resolvableNodeId == null) {
                resolvableNodeId = platformService.getLocalInstanceNodeSessionId();
            }
            try {
                return ((RemotableNotificationService) communicationService.getRemotableService(RemotableNotificationService.class, resolvableNodeId)).subscribe(str, notificationSubscriber);
            } catch (RemoteOperationException e) {
                throw new RemoteOperationException(String.valueOf(StringUtils.format("Failed to subscribe to publisher @%s: ", new Object[]{resolvableNodeId})) + e.toString());
            }
        } catch (RuntimeException e2) {
            LOGGER.error("Notification Id is not a valid RegExp: " + str, e2);
            throw e2;
        }
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public Map<InstanceNodeSessionId, Map<String, Long>> subscribeToAllReachableNodes(final String str, final NotificationSubscriber notificationSubscriber) {
        final Map<InstanceNodeSessionId, Map<String, Long>> synchronizedMap = Collections.synchronizedMap(new HashMap());
        Set<InstanceNodeSessionId> reachableInstanceNodes = communicationService.getReachableInstanceNodes();
        CallablesGroup createCallablesGroup = ConcurrencyUtils.getFactory().createCallablesGroup(Void.class);
        for (final InstanceNodeSessionId instanceNodeSessionId : reachableInstanceNodes) {
            createCallablesGroup.add(new Callable<Void>() { // from class: de.rcenvironment.core.notification.internal.DistributedNotificationServiceImpl.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                @TaskDescription("Distributed notification subscription")
                public Void call() throws Exception {
                    synchronizedMap.put(instanceNodeSessionId, DistributedNotificationServiceImpl.this.subscribe(str, notificationSubscriber, instanceNodeSessionId));
                    return null;
                }
            });
        }
        createCallablesGroup.executeParallel(new AsyncExceptionListener() { // from class: de.rcenvironment.core.notification.internal.DistributedNotificationServiceImpl.2
            public void onAsyncException(Exception exc) {
                DistributedNotificationServiceImpl.LOGGER.error("Asynchronous exception while subscribing for notification " + str);
            }
        });
        return synchronizedMap;
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public void unsubscribe(String str, NotificationSubscriber notificationSubscriber, ResolvableNodeId resolvableNodeId) throws RemoteOperationException {
        try {
            getRemoteNotificationService(resolvableNodeId).unsubscribe(str, notificationSubscriber);
        } catch (RuntimeException | RemoteOperationException e) {
            throw new RemoteOperationException(String.valueOf(StringUtils.format("Failed to unsubscribe from publisher %s: ", new Object[]{resolvableNodeId})) + e.getMessage());
        }
    }

    @Override // de.rcenvironment.core.notification.DistributedNotificationService
    public Map<String, List<Notification>> getNotifications(String str, ResolvableNodeId resolvableNodeId) throws RemoteOperationException {
        return getRemoteNotificationService(resolvableNodeId).getNotifications(str);
    }

    private RemotableNotificationService getRemoteNotificationService(ResolvableNodeId resolvableNodeId) throws RemoteOperationException {
        return (RemotableNotificationService) communicationService.getRemotableService(RemotableNotificationService.class, resolvableNodeId);
    }
}
