/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.PartitionTimeExtractor;
import org.apache.flink.table.filesystem.stream.PartitionCommitPredicate;
import org.apache.flink.table.utils.PartitionPathUtils;

public class PartitionTimeCommitPredicate
implements PartitionCommitPredicate {
    private final PartitionTimeExtractor extractor;
    private final long commitDelay;
    private final List<String> partitionKeys;
    private final ZoneId watermarkTimeZone;

    public PartitionTimeCommitPredicate(Configuration conf, ClassLoader cl, List<String> partitionKeys) {
        this.partitionKeys = partitionKeys;
        this.commitDelay = ((Duration)conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY)).toMillis();
        this.extractor = PartitionTimeExtractor.create(cl, (String)conf.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND), (String)conf.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_CLASS), (String)conf.get(FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));
        this.watermarkTimeZone = ZoneId.of(conf.getString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
    }

    @Override
    public boolean isPartitionCommittable(PartitionCommitPredicate.PredicateContext predicateContext) {
        LocalDateTime partitionTime = this.extractor.extract(this.partitionKeys, PartitionPathUtils.extractPartitionValues(new Path(predicateContext.partition())));
        return this.watermarkHasPassedWithDelay(predicateContext.currentWatermark(), partitionTime, this.commitDelay);
    }

    private boolean watermarkHasPassedWithDelay(long watermark, LocalDateTime partitionTime, long commitDelay) {
        long epochPartTime = partitionTime.atZone(this.watermarkTimeZone).toInstant().toEpochMilli();
        return watermark > epochPartTime + commitDelay;
    }
}

