/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
public class KafkaStreams {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
    private static final String JMX_PREFIX = "kafka.streams";
    private static final int DEFAULT_CLOSE_TIMEOUT = 0;
    private GlobalStreamThread globalStreamThread;
    private final StreamThread[] threads;
    private final Map<Long, StreamThread.State> threadState;
    private final Metrics metrics;
    private final QueryableStoreProvider queryableStoreProvider;
    private final UUID processId;
    private final String logPrefix;
    private final StreamsMetadataState streamsMetadataState;
    private final StreamsConfig config;
    private volatile State state = State.CREATED;
    private StateListener stateListener = null;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    private synchronized void setState(State newState) {
        State oldState = this.state;
        if (!this.state.isValidTransition(newState)) {
            log.warn("{} Unexpected state transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
        } else {
            log.info("{} State transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
        }
        this.state = newState;
        if (this.stateListener != null) {
            this.stateListener.onChange(this.state, oldState);
        }
    }

    public synchronized State state() {
        return this.state;
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    public KafkaStreams(TopologyBuilder builder, Properties props) {
        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
        this(builder, config, new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) {
        Time time = Time.SYSTEM;
        this.processId = UUID.randomUUID();
        this.config = config;
        String applicationId = config.getString("application.id");
        builder.setApplicationId(applicationId);
        String clientId = config.getString("client.id");
        if (clientId.length() <= 0) {
            clientId = applicationId + "-" + this.processId;
        }
        this.logPrefix = String.format("stream-client [%s]", clientId);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName((String)config.getString("metrics.recording.level"))).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        this.metrics = new Metrics(metricConfig, reporters, time);
        this.threads = new StreamThread[config.getInt("num.stream.threads").intValue()];
        this.threadState = new HashMap<Long, StreamThread.State>(this.threads.length);
        ArrayList<StateStoreProvider> storeProviders = new ArrayList<StateStoreProvider>();
        this.streamsMetadataState = new StreamsMetadataState(builder, KafkaStreams.parseHostInfo(config.getString("application.server")));
        ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
        if (config.getLong("cache.max.bytes.buffering") < 0L) {
            log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", (Object)this.logPrefix);
        }
        long cacheSizeBytes = Math.max(0L, config.getLong("cache.max.bytes.buffering") / (long)(config.getInt("num.stream.threads") + (globalTaskTopology == null ? 0 : 1)));
        if (globalTaskTopology != null) {
            this.globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), new StateDirectory(applicationId, config.getString("state.dir")), this.metrics, time, clientId);
        }
        for (int i = 0; i < this.threads.length; ++i) {
            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time, this.streamsMetadataState, cacheSizeBytes);
            this.threads[i].setStateListener(new StreamStateListener());
            this.threadState.put(this.threads[i].getId(), this.threads[i].state());
            storeProviders.add(new StreamThreadStateStoreProvider(this.threads[i]));
        }
        GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
        this.queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
    }

    private static HostInfo parseHostInfo(String endPoint) {
        if (endPoint == null || endPoint.trim().isEmpty()) {
            return StreamsMetadataState.UNKNOWN_HOST;
        }
        String host = Utils.getHost((String)endPoint);
        Integer port = Utils.getPort((String)endPoint);
        if (host == null || port == null) {
            throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
        }
        return new HostInfo(host, port);
    }

    private void checkBrokerVersionCompatibility() throws StreamsException {
        StreamsKafkaClient client = new StreamsKafkaClient(this.config);
        client.checkBrokerCompatibility();
        try {
            client.close();
        }
        catch (IOException e) {
            log.warn("{} Could not close StreamKafkaClient.", (Object)this.logPrefix, (Object)e);
        }
    }

    public synchronized void start() throws IllegalStateException, StreamsException {
        log.debug("{} Starting Kafka Stream process.", (Object)this.logPrefix);
        if (this.state == State.CREATED) {
            this.checkBrokerVersionCompatibility();
            this.setState(State.RUNNING);
            if (this.globalStreamThread != null) {
                this.globalStreamThread.start();
            }
            for (StreamThread thread : this.threads) {
                thread.start();
            }
        } else {
            throw new IllegalStateException("Cannot start again.");
        }
        log.info("{} Started Kafka Stream process", (Object)this.logPrefix);
    }

    public void close() {
        this.close(0L, TimeUnit.SECONDS);
    }

    public synchronized boolean close(long timeout, TimeUnit timeUnit) {
        log.debug("{} Stopping Kafka Stream process.", (Object)this.logPrefix);
        if (this.state.isCreatedOrRunning()) {
            this.setState(State.PENDING_SHUTDOWN);
            Thread shutdown = new Thread(new Runnable(){

                @Override
                public void run() {
                    for (StreamThread thread : KafkaStreams.this.threads) {
                        thread.setStateListener(null);
                        thread.close();
                    }
                    if (KafkaStreams.this.globalStreamThread != null) {
                        KafkaStreams.this.globalStreamThread.close();
                        if (!KafkaStreams.this.globalStreamThread.stillRunning()) {
                            try {
                                KafkaStreams.this.globalStreamThread.join();
                            }
                            catch (InterruptedException e) {
                                Thread.interrupted();
                            }
                        }
                    }
                    for (StreamThread thread : KafkaStreams.this.threads) {
                        try {
                            if (thread.stillRunning()) continue;
                            thread.join();
                        }
                        catch (InterruptedException ex) {
                            Thread.interrupted();
                        }
                    }
                    KafkaStreams.this.metrics.close();
                    log.info("{} Stopped Kafka Streams process.", (Object)KafkaStreams.this.logPrefix);
                }
            }, "kafka-streams-close-thread");
            shutdown.setDaemon(true);
            shutdown.start();
            try {
                shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
            this.setState(State.NOT_RUNNING);
            return !shutdown.isAlive();
        }
        return true;
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder().append(indent).append("KafkaStreams processID: ").append(this.processId).append("\n");
        for (StreamThread thread : this.threads) {
            sb.append(thread.toString(indent + "\t"));
        }
        sb.append("\n");
        return sb.toString();
    }

    public void cleanUp() {
        if (this.state.isRunning()) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        String appId = this.config.getString("application.id");
        String stateDir = this.config.getString("state.dir");
        String localApplicationDir = stateDir + File.separator + appId;
        log.debug("{} Removing local Kafka Streams application data in {} for application {}.", new Object[]{this.logPrefix, localApplicationDir, appId});
        StateDirectory stateDirectory = new StateDirectory(appId, stateDir);
        stateDirectory.cleanRemovedTasks();
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) {
        for (StreamThread thread : this.threads) {
            thread.setUncaughtExceptionHandler(eh);
        }
        if (this.globalStreamThread != null) {
            this.globalStreamThread.setUncaughtExceptionHandler(eh);
        }
    }

    public Collection<StreamsMetadata> allMetadata() {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadata();
    }

    public Collection<StreamsMetadata> allMetadataForStore(String storeName) {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadataForStore(storeName);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
    }

    public <T> T store(String storeName, QueryableStoreType<T> queryableStoreType) {
        this.validateIsRunning();
        return this.queryableStoreProvider.getStore(storeName, queryableStoreType);
    }

    private void validateIsRunning() {
        if (!this.state.isRunning()) {
            throw new IllegalStateException("KafkaStreams is not running. State is " + (Object)((Object)this.state) + ".");
        }
    }

    private class StreamStateListener
    implements StreamThread.StateListener {
        private StreamStateListener() {
        }

        @Override
        public synchronized void onChange(StreamThread thread, StreamThread.State newState, StreamThread.State oldState) {
            KafkaStreams.this.threadState.put(thread.getId(), newState);
            if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.ASSIGNING_PARTITIONS) {
                KafkaStreams.this.setState(State.REBALANCING);
            } else if (newState == StreamThread.State.RUNNING) {
                for (StreamThread.State state : KafkaStreams.this.threadState.values()) {
                    if (state == StreamThread.State.RUNNING) continue;
                    return;
                }
                KafkaStreams.this.setState(State.RUNNING);
            }
        }
    }

    public static interface StateListener {
        public void onChange(State var1, State var2);
    }

    public static enum State {
        CREATED(1, 2, 3),
        RUNNING(2, 3),
        REBALANCING(1, 2, 3),
        PENDING_SHUTDOWN(4),
        NOT_RUNNING(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return this.equals((Object)RUNNING) || this.equals((Object)REBALANCING);
        }

        public boolean isCreatedOrRunning() {
            return this.isRunning() || this.equals((Object)CREATED);
        }

        public boolean isValidTransition(State newState) {
            return this.validTransitions.contains(newState.ordinal());
        }
    }
}

