package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction.class */
public class BufferedUpsertSinkFunction extends RichSinkFunction<RowData> implements CheckpointedFunction, CheckpointListener {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(BufferedUpsertSinkFunction.class);
    private final RichSinkFunction<RowData> producer;
    private final int batchMaxRowNums;
    private final long batchIntervalMs;
    private final DataType physicalDataType;
    private final int[] keyProjection;
    private final TypeInformation<RowData> consumedRowDataTypeInfo;
    private boolean closed;
    private int batchCount = 0;
    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
    private transient WrappedContext wrappedContext;
    private transient Function<RowData, RowData> keyExtractor;
    private transient Function<RowData, RowData> valueCopier;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunction$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSinkFunction$WrappedContext.class */
    public static class WrappedContext implements SinkFunction.Context {
        private Long timestamp;
        private SinkFunction.Context context;

        private WrappedContext() {
        }

        void setTimestamp(Long l) {
            this.timestamp = l;
        }

        void setContext(SinkFunction.Context context) {
            this.context = context;
        }

        public long currentProcessingTime() {
            return this.context.currentProcessingTime();
        }

        public long currentWatermark() {
            return this.context.currentWatermark();
        }

        public Long timestamp() {
            return this.timestamp;
        }

        /* synthetic */ WrappedContext(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public BufferedUpsertSinkFunction(RichSinkFunction<RowData> richSinkFunction, DataType dataType, int[] iArr, TypeInformation<RowData> typeInformation, SinkBufferFlushMode sinkBufferFlushMode) {
        Preconditions.checkArgument(sinkBufferFlushMode != null && sinkBufferFlushMode.isEnabled());
        this.producer = (RichSinkFunction) Preconditions.checkNotNull(richSinkFunction, "Producer must not be null.");
        this.physicalDataType = (DataType) Preconditions.checkNotNull(dataType, "Physical data type must not be null.");
        this.keyProjection = (int[]) Preconditions.checkNotNull(iArr, "key projection must not be null.");
        this.consumedRowDataTypeInfo = typeInformation;
        this.batchMaxRowNums = sinkBufferFlushMode.getBatchSize();
        this.batchIntervalMs = sinkBufferFlushMode.getBatchIntervalMs();
    }

    public void open(Configuration configuration) throws Exception {
        Function<RowData, RowData> identity;
        this.reduceBuffer = new HashMap();
        this.wrappedContext = new WrappedContext(null);
        this.closed = false;
        List children = this.physicalDataType.getLogicalType().getChildren();
        RowData.FieldGetter[] fieldGetterArr = (RowData.FieldGetter[]) Arrays.stream(this.keyProjection).mapToObj(i -> {
            return RowData.createFieldGetter((LogicalType) children.get(i), i);
        }).toArray(i2 -> {
            return new RowData.FieldGetter[i2];
        });
        this.keyExtractor = rowData -> {
            return DynamicKafkaSerializationSchema.createProjectedRow(rowData, RowKind.INSERT, fieldGetterArr);
        };
        TypeSerializer createSerializer = this.consumedRowDataTypeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
        if (getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
            createSerializer.getClass();
            identity = (v1) -> {
                return r1.copy(v1);
            };
        } else {
            identity = Function.identity();
        }
        this.valueCopier = identity;
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("upsert-kafka-sink-function"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        flush();
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.batchIntervalMs, this.batchIntervalMs, TimeUnit.MILLISECONDS);
        this.producer.open(configuration);
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.producer.setRuntimeContext(runtimeContext);
    }

    public RuntimeContext getRuntimeContext() {
        return this.producer.getRuntimeContext();
    }

    public void invoke(RowData rowData, SinkFunction.Context context) throws Exception {
        this.wrappedContext.setContext(context);
        addToBuffer(rowData, context.timestamp());
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.producer instanceof CheckpointListener) {
            this.producer.notifyCheckpointComplete(j);
        }
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.producer instanceof CheckpointListener) {
            this.producer.notifyCheckpointAborted(j);
        }
    }

    public synchronized void close() throws Exception {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.batchCount > 0) {
                try {
                    flush();
                } catch (Exception e) {
                    LOG.warn("Writing records to kafka failed.", e);
                    throw new RuntimeException("Writing records to kafka failed.", e);
                }
            }
            this.producer.close();
        }
        super.close();
        checkFlushException();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        flush();
        if (this.producer instanceof CheckpointedFunction) {
            this.producer.snapshotState(functionSnapshotContext);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.producer instanceof CheckpointedFunction) {
            this.producer.initializeState(functionInitializationContext);
        }
    }

    private synchronized void addToBuffer(RowData rowData, Long l) throws Exception {
        checkFlushException();
        this.reduceBuffer.put(this.keyExtractor.apply(rowData), new Tuple2<>(changeFlag(this.valueCopier.apply(rowData)), l));
        this.batchCount++;
        if (this.batchCount >= this.batchMaxRowNums) {
            flush();
        }
    }

    private synchronized void flush() throws Exception {
        checkFlushException();
        for (Tuple2<RowData, Long> tuple2 : this.reduceBuffer.values()) {
            this.wrappedContext.setTimestamp((Long) tuple2.f1);
            this.producer.invoke(tuple2.f0, this.wrappedContext);
        }
        this.reduceBuffer.clear();
        this.batchCount = 0;
    }

    private RowData changeFlag(RowData rowData) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[rowData.getRowKind().ordinal()]) {
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP /* 1 */:
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK /* 2 */:
                rowData.setRowKind(RowKind.UPDATE_AFTER);
                break;
            case 3:
            case 4:
                rowData.setRowKind(RowKind.DELETE);
                break;
        }
        return rowData;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to JDBC failed.", this.flushException);
        }
    }
}
