/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.aggregator;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
import org.springframework.integration.aggregator.MessageGroupExpiredEvent;
import org.springframework.integration.aggregator.MessageGroupProcessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.aggregator.SequenceNumberComparator;
import org.springframework.integration.aggregator.SequenceSizeReleaseStrategy;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public abstract class AbstractCorrelatingMessageHandler
extends AbstractMessageProducingHandler
implements DiscardingMessageHandler,
DisposableBean,
ApplicationEventPublisherAware {
    private static final Log logger = LogFactory.getLog(AbstractCorrelatingMessageHandler.class);
    private final Comparator<Message<?>> sequenceNumberComparator = new SequenceNumberComparator();
    private final Map<UUID, ScheduledFuture<?>> expireGroupScheduledFutures = new HashMap();
    private final MessageGroupProcessor outputProcessor;
    private volatile MessageGroupStore messageStore;
    private volatile CorrelationStrategy correlationStrategy;
    private volatile ReleaseStrategy releaseStrategy;
    private volatile MessageChannel discardChannel;
    private volatile String discardChannelName;
    private boolean sendPartialResultOnExpiry = false;
    private volatile boolean sequenceAware = false;
    private volatile LockRegistry lockRegistry = new DefaultLockRegistry();
    private boolean lockRegistrySet = false;
    private volatile long minimumTimeoutForEmptyGroups;
    private volatile boolean releasePartialSequences;
    private volatile Expression groupTimeoutExpression;
    private volatile List<Advice> forceReleaseAdviceChain;
    private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
    private EvaluationContext evaluationContext;
    private volatile ApplicationEventPublisher applicationEventPublisher;
    private volatile boolean expireGroupsUponTimeout = true;

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
        Assert.notNull((Object)processor);
        Assert.notNull((Object)store);
        this.setMessageStore(store);
        this.outputProcessor = processor;
        this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy("correlationId") : correlationStrategy;
        this.releaseStrategy = releaseStrategy == null ? new SequenceSizeReleaseStrategy() : releaseStrategy;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) {
        this(processor, store, null, null);
    }

    public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor) {
        this(processor, new SimpleMessageStore(0), null, null);
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        Assert.isTrue((!this.lockRegistrySet ? 1 : 0) != 0, (String)"'this.lockRegistry' can not be reset once its been set");
        Assert.notNull((Object)"'lockRegistry' must not be null");
        this.lockRegistry = lockRegistry;
        this.lockRegistrySet = true;
    }

    public final void setMessageStore(MessageGroupStore store) {
        this.messageStore = store;
        store.registerMessageGroupExpiryCallback(new MessageGroupStore.MessageGroupCallback(){

            @Override
            public void execute(MessageGroupStore messageGroupStore, MessageGroup group) {
                AbstractCorrelatingMessageHandler.this.forceReleaseProcessor.processMessageGroup(group);
            }
        });
    }

    public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
        Assert.notNull((Object)correlationStrategy);
        this.correlationStrategy = correlationStrategy;
    }

    public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {
        Assert.notNull((Object)releaseStrategy);
        this.releaseStrategy = releaseStrategy;
        this.sequenceAware = this.releaseStrategy instanceof SequenceSizeReleaseStrategy;
    }

    public void setGroupTimeoutExpression(Expression groupTimeoutExpression) {
        this.groupTimeoutExpression = groupTimeoutExpression;
    }

    public void setForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) {
        Assert.notNull(forceReleaseAdviceChain, (String)"forceReleaseAdviceChain must not be null");
        this.forceReleaseAdviceChain = forceReleaseAdviceChain;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    public void setTaskScheduler(TaskScheduler taskScheduler) {
        super.setTaskScheduler(taskScheduler);
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    protected void onInit() throws Exception {
        super.onInit();
        Assert.state((this.discardChannelName == null || this.discardChannel == null ? 1 : 0) != 0, (String)"'discardChannelName' and 'discardChannel' are mutually exclusive.");
        BeanFactory beanFactory = this.getBeanFactory();
        if (beanFactory != null) {
            if (this.outputProcessor instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.outputProcessor).setBeanFactory(beanFactory);
            }
            if (this.correlationStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.correlationStrategy).setBeanFactory(beanFactory);
            }
            if (this.releaseStrategy instanceof BeanFactoryAware) {
                ((BeanFactoryAware)this.releaseStrategy).setBeanFactory(beanFactory);
            }
        }
        if (this.discardChannel == null) {
            this.discardChannel = new NullChannel();
        }
        if (this.releasePartialSequences) {
            Assert.isInstanceOf(SequenceSizeReleaseStrategy.class, (Object)this.releaseStrategy, (String)("Release strategy of type [" + this.releaseStrategy.getClass().getSimpleName() + "] cannot release partial sequences. Use the default SequenceSizeReleaseStrategy instead."));
            ((SequenceSizeReleaseStrategy)this.releaseStrategy).setReleasePartialSequences(this.releasePartialSequences);
        }
        if (this.evaluationContext == null) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.getBeanFactory());
        }
        this.lockRegistrySet = true;
        this.forceReleaseProcessor = this.createGroupTimeoutProcessor();
    }

    private MessageGroupProcessor createGroupTimeoutProcessor() {
        ForceReleaseMessageGroupProcessor processor = new ForceReleaseMessageGroupProcessor();
        if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
            ProxyFactory proxyFactory = new ProxyFactory((Object)processor);
            for (Advice advice : this.forceReleaseAdviceChain) {
                proxyFactory.addAdvice(advice);
            }
            return (MessageGroupProcessor)proxyFactory.getProxy(this.getApplicationContext().getClassLoader());
        }
        return processor;
    }

    public void setDiscardChannel(MessageChannel discardChannel) {
        Assert.notNull((Object)discardChannel, (String)"'discardChannel' cannot be null");
        this.discardChannel = discardChannel;
    }

    public void setDiscardChannelName(String discardChannelName) {
        Assert.hasText((String)discardChannelName, (String)"'discardChannelName' must not be empty");
        this.discardChannelName = discardChannelName;
    }

    public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry) {
        this.sendPartialResultOnExpiry = sendPartialResultOnExpiry;
    }

    public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups) {
        this.minimumTimeoutForEmptyGroups = minimumTimeoutForEmptyGroups;
    }

    public void setReleasePartialSequences(boolean releasePartialSequences) {
        this.releasePartialSequences = releasePartialSequences;
    }

    public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
        this.expireGroupsUponTimeout = expireGroupsUponTimeout;
    }

    @Override
    public String getComponentType() {
        return "aggregator";
    }

    public MessageGroupStore getMessageStore() {
        return this.messageStore;
    }

    protected Map<UUID, ScheduledFuture<?>> getExpireGroupScheduledFutures() {
        return this.expireGroupScheduledFutures;
    }

    protected MessageGroupProcessor getOutputProcessor() {
        return this.outputProcessor;
    }

    protected CorrelationStrategy getCorrelationStrategy() {
        return this.correlationStrategy;
    }

    protected ReleaseStrategy getReleaseStrategy() {
        return this.releaseStrategy;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageChannel getDiscardChannel() {
        if (this.discardChannelName != null) {
            AbstractCorrelatingMessageHandler abstractCorrelatingMessageHandler = this;
            synchronized (abstractCorrelatingMessageHandler) {
                if (this.discardChannelName != null) {
                    this.discardChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.discardChannelName);
                    this.discardChannelName = null;
                }
            }
        }
        return this.discardChannel;
    }

    protected String getDiscardChannelName() {
        return this.discardChannelName;
    }

    protected boolean isSendPartialResultOnExpiry() {
        return this.sendPartialResultOnExpiry;
    }

    protected boolean isSequenceAware() {
        return this.sequenceAware;
    }

    protected LockRegistry getLockRegistry() {
        return this.lockRegistry;
    }

    protected boolean isLockRegistrySet() {
        return this.lockRegistrySet;
    }

    protected long getMinimumTimeoutForEmptyGroups() {
        return this.minimumTimeoutForEmptyGroups;
    }

    protected boolean isReleasePartialSequences() {
        return this.releasePartialSequences;
    }

    protected Expression getGroupTimeoutExpression() {
        return this.groupTimeoutExpression;
    }

    protected EvaluationContext getEvaluationContext() {
        return this.evaluationContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void handleMessageInternal(Message<?> message) throws Exception {
        block12: {
            Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
            Assert.state((correlationKey != null ? 1 : 0) != 0, (String)"Null correlation not allowed.  Maybe the CorrelationStrategy is failing?");
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Handling message with correlationKey [" + correlationKey + "]: " + message));
            }
            UUID groupIdUuid = UUIDConverter.getUUID(correlationKey);
            Lock lock = this.lockRegistry.obtain(groupIdUuid.toString());
            lock.lockInterruptibly();
            try {
                boolean canceled;
                ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(groupIdUuid);
                if (scheduledFuture != null && (canceled = scheduledFuture.cancel(true)) && logger.isDebugEnabled()) {
                    logger.debug((Object)("Cancel 'forceComplete' scheduling for MessageGroup with Correlation Key [ " + correlationKey + "]."));
                }
                MessageGroup messageGroup = this.messageStore.getMessageGroup(correlationKey);
                if (this.sequenceAware) {
                    messageGroup = new SequenceAwareMessageGroup(messageGroup);
                }
                if (!messageGroup.isComplete() && messageGroup.canAdd(message)) {
                    if (logger.isTraceEnabled()) {
                        logger.trace((Object)("Adding message to group [ " + messageGroup + "]"));
                    }
                    if (this.releaseStrategy.canRelease(messageGroup = this.store(correlationKey, message))) {
                        Collection<Message<?>> completedMessages = null;
                        try {
                            completedMessages = this.completeGroup(message, correlationKey, messageGroup);
                            break block12;
                        }
                        finally {
                            this.afterRelease(messageGroup, completedMessages);
                        }
                    }
                    this.scheduleGroupToForceComplete(messageGroup);
                    break block12;
                }
                this.discardMessage(message);
            }
            finally {
                lock.unlock();
            }
        }
    }

    private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
        Long groupTimeout = this.obtainGroupTimeout(messageGroup);
        if (groupTimeout != null && groupTimeout >= 0L) {
            if (groupTimeout > 0L) {
                final Object groupId = messageGroup.getGroupId();
                ScheduledFuture scheduledFuture = this.getTaskScheduler().schedule(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            AbstractCorrelatingMessageHandler.this.processForceRelease(groupId);
                        }
                        catch (MessageDeliveryException e) {
                            if (logger.isDebugEnabled()) {
                                logger.debug((Object)("The MessageGroup [ " + groupId + "] is rescheduled by the reason: " + e.getMessage()));
                            }
                            AbstractCorrelatingMessageHandler.this.scheduleGroupToForceComplete(groupId);
                        }
                    }
                }, new Date(System.currentTimeMillis() + groupTimeout));
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Schedule MessageGroup [ " + messageGroup + "] to 'forceComplete'."));
                }
                this.expireGroupScheduledFutures.put(UUIDConverter.getUUID(groupId), scheduledFuture);
            } else {
                this.forceReleaseProcessor.processMessageGroup(messageGroup);
            }
        }
    }

    private void scheduleGroupToForceComplete(Object groupId) {
        MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
        this.scheduleGroupToForceComplete(messageGroup);
    }

    private void processForceRelease(Object groupId) {
        MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
        this.forceReleaseProcessor.processMessageGroup(messageGroup);
    }

    private void discardMessage(Message<?> message) {
        this.messagingTemplate.send(this.getDiscardChannel(), message);
    }

    protected abstract void afterRelease(MessageGroup var1, Collection<Message<?>> var2);

    protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
        this.afterRelease(group, completedMessages);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void forceComplete(MessageGroup group) {
        Object correlationKey = group.getGroupId();
        Lock lock = this.lockRegistry.obtain(UUIDConverter.getUUID(correlationKey).toString());
        boolean removeGroup = true;
        try {
            lock.lockInterruptibly();
            try {
                boolean canceled;
                ScheduledFuture<?> scheduledFuture = this.expireGroupScheduledFutures.remove(UUIDConverter.getUUID(correlationKey));
                if (scheduledFuture != null && (canceled = scheduledFuture.cancel(false)) && logger.isDebugEnabled()) {
                    logger.debug((Object)("Cancel 'forceComplete' scheduling for MessageGroup [ " + group + "]."));
                }
                MessageGroup groupNow = group;
                if (!group.isComplete()) {
                    groupNow = this.messageStore.getMessageGroup(correlationKey);
                }
                long lastModifiedNow = groupNow.getLastModified();
                int groupSize = groupNow.size();
                if (!(groupNow.isComplete() && groupSize != 0 || group.getLastModified() != lastModifiedNow || group.getTimestamp() != groupNow.getTimestamp())) {
                    if (groupSize > 0) {
                        if (this.releaseStrategy.canRelease(groupNow)) {
                            this.completeGroup(correlationKey, groupNow);
                        } else {
                            this.expireGroup(correlationKey, groupNow);
                        }
                        if (!this.expireGroupsUponTimeout) {
                            this.afterRelease(groupNow, groupNow.getMessages(), true);
                            removeGroup = false;
                        }
                    } else {
                        boolean bl = removeGroup = lastModifiedNow <= System.currentTimeMillis() - this.minimumTimeoutForEmptyGroups;
                        if (removeGroup && logger.isDebugEnabled()) {
                            logger.debug((Object)("Removing empty group: " + correlationKey));
                        }
                    }
                } else {
                    removeGroup = false;
                    if (logger.isDebugEnabled()) {
                        logger.debug((Object)("Group expiry candidate (" + correlationKey + ") has changed - it may be reconsidered for a future expiration"));
                    }
                }
            }
            catch (MessageDeliveryException e) {
                removeGroup = false;
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Group expiry candidate (" + correlationKey + ") has been affected by MessageDeliveryException - it may be reconsidered for a future expiration one more time"));
                }
                throw e;
            }
            finally {
                try {
                    if (removeGroup) {
                        this.remove(group);
                    }
                }
                finally {
                    lock.unlock();
                }
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            logger.debug((Object)"Thread was interrupted while trying to obtain lock");
        }
    }

    void remove(MessageGroup group) {
        Object correlationKey = group.getGroupId();
        this.messageStore.removeMessageGroup(correlationKey);
    }

    protected int findLastReleasedSequenceNumber(Object groupId, Collection<Message<?>> partialSequence) {
        Message<?> lastReleasedMessage = Collections.max(partialSequence, this.sequenceNumberComparator);
        return new IntegrationMessageHeaderAccessor(lastReleasedMessage).getSequenceNumber();
    }

    protected MessageGroup store(Object correlationKey, Message<?> message) {
        return this.messageStore.addMessageToGroup(correlationKey, message);
    }

    protected void expireGroup(Object correlationKey, MessageGroup group) {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Expiring MessageGroup with correlationKey[" + correlationKey + "]"));
        }
        if (this.sendPartialResultOnExpiry) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Prematurely releasing partially complete group with key [" + correlationKey + "] to: " + this.getOutputChannel()));
            }
            this.completeGroup(correlationKey, group);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Discarding messages of partially complete group with key [" + correlationKey + "] to: " + (this.discardChannelName != null ? this.discardChannelName : this.discardChannel)));
            }
            for (Message<?> message : group.getMessages()) {
                this.discardMessage(message);
            }
        }
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent)new MessageGroupExpiredEvent(this, correlationKey, group.size(), new Date(group.getLastModified()), new Date(), !this.sendPartialResultOnExpiry));
        }
    }

    protected void completeGroup(Object correlationKey, MessageGroup group) {
        Message<?> first = null;
        if (group != null) {
            first = group.getOne();
        }
        this.completeGroup(first, correlationKey, group);
    }

    protected Collection<Message<?>> completeGroup(Message<?> message, Object correlationKey, MessageGroup group) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Completing group with correlationKey [" + correlationKey + "]"));
        }
        Object result = this.outputProcessor.processMessageGroup(group);
        Collection partialSequence = null;
        if (result instanceof Collection) {
            this.verifyResultCollectionConsistsOfMessages((Collection)result);
            partialSequence = (Collection)result;
        }
        this.sendOutputs(result, message);
        return partialSequence;
    }

    protected void verifyResultCollectionConsistsOfMessages(Collection<?> elements) {
        Class commonElementType = CollectionUtils.findCommonElementType(elements);
        Assert.isAssignable(Message.class, (Class)commonElementType, (String)("The expected collection of Messages contains non-Message element: " + commonElementType));
    }

    protected Long obtainGroupTimeout(MessageGroup group) {
        return this.groupTimeoutExpression != null ? (Long)this.groupTimeoutExpression.getValue(this.evaluationContext, (Object)group, Long.class) : null;
    }

    public void destroy() throws Exception {
        for (ScheduledFuture<?> future : this.expireGroupScheduledFutures.values()) {
            future.cancel(true);
        }
    }

    private class ForceReleaseMessageGroupProcessor
    implements MessageGroupProcessor {
        private ForceReleaseMessageGroupProcessor() {
        }

        @Override
        public Object processMessageGroup(MessageGroup group) {
            AbstractCorrelatingMessageHandler.this.forceComplete(group);
            return null;
        }
    }

    protected static class SequenceAwareMessageGroup
    extends SimpleMessageGroup {
        public SequenceAwareMessageGroup(MessageGroup messageGroup) {
            super(messageGroup);
        }

        @Override
        public boolean canAdd(Message<?> message) {
            if (this.size() == 0) {
                return true;
            }
            IntegrationMessageHeaderAccessor messageHeaderAccessor = new IntegrationMessageHeaderAccessor(message);
            Integer messageSequenceNumber = messageHeaderAccessor.getSequenceNumber();
            if (messageSequenceNumber != null && messageSequenceNumber > 0) {
                Integer messageSequenceSize = messageHeaderAccessor.getSequenceSize();
                return messageSequenceSize.equals(this.getSequenceSize()) && !this.containsSequenceNumber(this.getMessages(), messageSequenceNumber);
            }
            return true;
        }

        private boolean containsSequenceNumber(Collection<Message<?>> messages, Integer messageSequenceNumber) {
            for (Message<?> member : messages) {
                Integer memberSequenceNumber = new IntegrationMessageHeaderAccessor(member).getSequenceNumber();
                if (!messageSequenceNumber.equals(memberSequenceNumber)) continue;
                return true;
            }
            return false;
        }
    }
}

