package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWisePartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.class */
public class StreamingFileSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC = new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
    private final long bucketCheckInterval;
    private final BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
    private transient Buckets<IN, ?> buckets;
    private transient ProcessingTimeService processingTimeService;
    private transient ListState<byte[]> bucketStates;
    private transient ListState<Long> maxPartCountersState;

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$BucketsBuilder.class */
    private static abstract class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>> implements Serializable {
        private static final long serialVersionUID = 1;
        protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60000;

        private BucketsBuilder() {
        }

        protected T self() {
            return this;
        }

        abstract Buckets<IN, BucketID> createBuckets(int i) throws IOException;
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$BulkFormatBuilder.class */
    public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>> extends BucketsBuilder<IN, BucketID, T> {
        private static final long serialVersionUID = 1;
        private long bucketCheckInterval;
        private final Path basePath;
        private BulkWriter.Factory<IN> writerFactory;
        private BucketAssigner<IN, BucketID> bucketAssigner;
        private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
        private BucketFactory<IN, BucketID> bucketFactory;
        private OutputFileConfig outputFileConfig;

        protected BulkFormatBuilder(Path path, BulkWriter.Factory<IN> factory, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(path, factory, bucketAssigner, OnCheckpointRollingPolicy.build(), 60000L, new DefaultBucketFactoryImpl(), OutputFileConfig.builder().build());
        }

        protected BulkFormatBuilder(Path path, BulkWriter.Factory<IN> factory, BucketAssigner<IN, BucketID> bucketAssigner, CheckpointRollingPolicy<IN, BucketID> checkpointRollingPolicy, long j, BucketFactory<IN, BucketID> bucketFactory, OutputFileConfig outputFileConfig) {
            super();
            this.basePath = (Path) Preconditions.checkNotNull(path);
            this.writerFactory = factory;
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            this.rollingPolicy = (CheckpointRollingPolicy) Preconditions.checkNotNull(checkpointRollingPolicy);
            this.bucketCheckInterval = j;
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        }

        public long getBucketCheckInterval() {
            return this.bucketCheckInterval;
        }

        public T withBucketCheckInterval(long j) {
            this.bucketCheckInterval = j;
            return (T) self();
        }

        public T withBucketAssigner(BucketAssigner<IN, BucketID> bucketAssigner) {
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            return (T) self();
        }

        public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> checkpointRollingPolicy) {
            this.rollingPolicy = (CheckpointRollingPolicy) Preconditions.checkNotNull(checkpointRollingPolicy);
            return (T) self();
        }

        @VisibleForTesting
        T withBucketFactory(BucketFactory<IN, BucketID> bucketFactory) {
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
            return (T) self();
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T) self();
        }

        public <ID> BulkFormatBuilder<IN, ID, ? extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(BucketAssigner<IN, ID> bucketAssigner) {
            Preconditions.checkState(this.bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory");
            return new BulkFormatBuilder<>(this.basePath, this.writerFactory, (BucketAssigner) Preconditions.checkNotNull(bucketAssigner), this.rollingPolicy, this.bucketCheckInterval, new DefaultBucketFactoryImpl(), this.outputFileConfig);
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink<>(this, this.bucketCheckInterval);
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder
        Buckets<IN, BucketID> createBuckets(int i) throws IOException {
            return new Buckets<>(this.basePath, this.bucketAssigner, this.bucketFactory, new BulkPartWriter.Factory(this.writerFactory), this.rollingPolicy, i, this.outputFileConfig);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$DefaultBulkFormatBuilder.class */
    public static final class DefaultBulkFormatBuilder<IN> extends BulkFormatBuilder<IN, String, DefaultBulkFormatBuilder<IN>> {
        private static final long serialVersionUID = 7493169281036370228L;

        private DefaultBulkFormatBuilder(Path path, BulkWriter.Factory<IN> factory, BucketAssigner<IN, String> bucketAssigner) {
            super(path, factory, bucketAssigner);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$DefaultRowFormatBuilder.class */
    public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
        private static final long serialVersionUID = -8503344257202146718L;

        private DefaultRowFormatBuilder(Path path, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
            super(path, encoder, bucketAssigner);
        }
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink$RowFormatBuilder.class */
    public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>> extends BucketsBuilder<IN, BucketID, T> {
        private static final long serialVersionUID = 1;
        private long bucketCheckInterval;
        private final Path basePath;
        private Encoder<IN> encoder;
        private BucketAssigner<IN, BucketID> bucketAssigner;
        private RollingPolicy<IN, BucketID> rollingPolicy;
        private BucketFactory<IN, BucketID> bucketFactory;
        private OutputFileConfig outputFileConfig;

        protected RowFormatBuilder(Path path, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(path, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), 60000L, new DefaultBucketFactoryImpl(), OutputFileConfig.builder().build());
        }

        protected RowFormatBuilder(Path path, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner, RollingPolicy<IN, BucketID> rollingPolicy, long j, BucketFactory<IN, BucketID> bucketFactory, OutputFileConfig outputFileConfig) {
            super();
            this.basePath = (Path) Preconditions.checkNotNull(path);
            this.encoder = (Encoder) Preconditions.checkNotNull(encoder);
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
            this.bucketCheckInterval = j;
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
        }

        public long getBucketCheckInterval() {
            return this.bucketCheckInterval;
        }

        public T withBucketCheckInterval(long j) {
            this.bucketCheckInterval = j;
            return (T) self();
        }

        public T withBucketAssigner(BucketAssigner<IN, BucketID> bucketAssigner) {
            this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
            return (T) self();
        }

        public T withRollingPolicy(RollingPolicy<IN, BucketID> rollingPolicy) {
            this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
            return (T) self();
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T) self();
        }

        public <ID> RowFormatBuilder<IN, ID, ? extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(BucketAssigner<IN, ID> bucketAssigner, RollingPolicy<IN, ID> rollingPolicy) {
            Preconditions.checkState(this.bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory");
            return new RowFormatBuilder<>(this.basePath, this.encoder, (BucketAssigner) Preconditions.checkNotNull(bucketAssigner), (RollingPolicy) Preconditions.checkNotNull(rollingPolicy), this.bucketCheckInterval, new DefaultBucketFactoryImpl(), this.outputFileConfig);
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink<>(this, this.bucketCheckInterval);
        }

        @VisibleForTesting
        T withBucketFactory(BucketFactory<IN, BucketID> bucketFactory) {
            this.bucketFactory = (BucketFactory) Preconditions.checkNotNull(bucketFactory);
            return (T) self();
        }

        @Override // org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder
        Buckets<IN, BucketID> createBuckets(int i) throws IOException {
            return new Buckets<>(this.basePath, this.bucketAssigner, this.bucketFactory, new RowWisePartWriter.Factory(this.encoder), this.rollingPolicy, i, this.outputFileConfig);
        }
    }

    protected StreamingFileSink(RowFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> rowFormatBuilder, long j) {
        Preconditions.checkArgument(j > 0);
        this.bucketsBuilder = (BucketsBuilder) Preconditions.checkNotNull(rowFormatBuilder);
        this.bucketCheckInterval = j;
    }

    protected StreamingFileSink(BulkFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bulkFormatBuilder, long j) {
        Preconditions.checkArgument(j > 0);
        this.bucketsBuilder = (BucketsBuilder) Preconditions.checkNotNull(bulkFormatBuilder);
        this.bucketCheckInterval = j;
    }

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(Path path, Encoder<IN> encoder) {
        return new DefaultRowFormatBuilder<>(path, encoder, new DateTimeBucketAssigner());
    }

    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(Path path, BulkWriter.Factory<IN> factory) {
        return new DefaultBulkFormatBuilder<>(path, factory, new DateTimeBucketAssigner());
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.buckets = this.bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
        OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();
        this.bucketStates = operatorStateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = operatorStateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        if (functionInitializationContext.isRestored()) {
            this.buckets.initializeState(this.bucketStates, this.maxPartCountersState);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.buckets.commitUpToCheckpoint(j);
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState((this.bucketStates == null || this.maxPartCountersState == null) ? false : true, "sink has not been initialized");
        this.buckets.snapshotState(functionSnapshotContext.getCheckpointId(), this.bucketStates, this.maxPartCountersState);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.processingTimeService = getRuntimeContext().getProcessingTimeService();
        this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval, this);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        this.buckets.onProcessingTime(currentProcessingTime);
        this.processingTimeService.registerTimer(currentProcessingTime + this.bucketCheckInterval, this);
    }

    @Override // org.apache.flink.streaming.api.functions.sink.SinkFunction
    public void invoke(IN in, SinkFunction.Context context) throws Exception {
        this.buckets.onElement(in, context);
    }

    public void close() throws Exception {
        if (this.buckets != null) {
            this.buckets.close();
        }
    }
}
