package org.apache.flink.table.filesystem;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/filesystem/FileSystemLookupFunction.class */
public class FileSystemLookupFunction<T extends InputSplit> extends TableFunction<RowData> {
    private static final long serialVersionUID = 1;
    private static final int MAX_RETRIES = 3;
    private final InputFormat<RowData, T> inputFormat;
    private final String[] producedNames;
    private final DataType[] producedTypes;
    private final Duration cacheTTL;
    private final int[] lookupCols;
    private transient Map<Row, List<RowData>> cache;
    private transient long nextLoadTime;
    private transient TypeSerializer<RowData> serializer;
    private final DataFormatConverters.DataFormatConverter[] converters;
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(10);

    public FileSystemLookupFunction(InputFormat<RowData, T> inputFormat, String[] strArr, String[] strArr2, DataType[] dataTypeArr, Duration duration) {
        this.lookupCols = new int[strArr.length];
        this.converters = new DataFormatConverters.DataFormatConverter[strArr.length];
        Map map = (Map) IntStream.range(0, strArr2.length).boxed().collect(Collectors.toMap(num -> {
            return strArr2[num.intValue()];
        }, num2 -> {
            return num2;
        }));
        for (int i = 0; i < strArr.length; i++) {
            Integer num3 = (Integer) map.get(strArr[i]);
            Preconditions.checkArgument(num3 != null, "Lookup keys %s not selected", new Object[]{Arrays.toString(strArr)});
            this.converters[i] = DataFormatConverters.getConverterForDataType(dataTypeArr[num3.intValue()]);
            this.lookupCols[i] = num3.intValue();
        }
        this.inputFormat = inputFormat;
        this.producedNames = strArr2;
        this.producedTypes = dataTypeArr;
        this.cacheTTL = duration;
    }

    public TypeInformation<RowData> getResultType() {
        return new RowDataTypeInfo((LogicalType[]) Arrays.stream(this.producedTypes).map((v0) -> {
            return v0.getLogicalType();
        }).toArray(i -> {
            return new LogicalType[i];
        }), this.producedNames);
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.cache = new HashMap();
        this.nextLoadTime = -1L;
        this.serializer = getResultType().createSerializer(new ExecutionConfig());
    }

    public void eval(Object... objArr) {
        Preconditions.checkArgument(objArr.length == this.lookupCols.length, "Number of values and lookup keys mismatch");
        checkCacheReload();
        for (int i = 0; i < objArr.length; i++) {
            objArr[i] = this.converters[i].toExternal(objArr[i]);
        }
        List<RowData> list = this.cache.get(Row.of(objArr));
        if (list != null) {
            Iterator<RowData> it = list.iterator();
            while (it.hasNext()) {
                collect(it.next());
            }
        }
    }

    @VisibleForTesting
    public Duration getCacheTTL() {
        return this.cacheTTL;
    }

    private void checkCacheReload() {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0) {
            LOG.info("Lookup join cache has expired after {} minute(s), reloading", Long.valueOf(getCacheTTL().toMinutes()));
        } else {
            LOG.info("Populating lookup join cache");
        }
        int i = 0;
        while (true) {
            this.cache.clear();
            try {
                InputSplit[] createInputSplits = this.inputFormat.createInputSplits(1);
                GenericRowData genericRowData = new GenericRowData(this.producedNames.length);
                long j = 0;
                for (InputSplit inputSplit : createInputSplits) {
                    this.inputFormat.open(inputSplit);
                    while (!this.inputFormat.reachedEnd()) {
                        RowData rowData = (RowData) this.inputFormat.nextRecord(genericRowData);
                        j += serialVersionUID;
                        this.cache.computeIfAbsent(extractKey(rowData), row -> {
                            return new ArrayList();
                        }).add(this.serializer.copy(rowData));
                    }
                    this.inputFormat.close();
                }
                this.nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
                LOG.info("Loaded {} row(s) into lookup join cache", Long.valueOf(j));
                return;
            } catch (IOException e) {
                if (i >= MAX_RETRIES) {
                    throw new FlinkRuntimeException(String.format("Failed to load table into cache after %d retries", Integer.valueOf(i)), e);
                }
                i++;
                long millis = i * RETRY_INTERVAL.toMillis();
                LOG.warn(String.format("Failed to load table into cache, will retry in %d seconds", Long.valueOf(millis / 1000)), e);
                try {
                    Thread.sleep(millis);
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted while waiting to retry failed cache load, aborting");
                    throw new FlinkRuntimeException(e2);
                }
            }
        }
    }

    private Row extractKey(RowData rowData) {
        Row row = new Row(this.lookupCols.length);
        for (int i = 0; i < this.lookupCols.length; i++) {
            row.setField(i, this.converters[i].toExternal(rowData, this.lookupCols[i]));
        }
        return row;
    }
}
