/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BaseTemporalSortOperator;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcTimeSortOperator
extends BaseTemporalSortOperator {
    private static final long serialVersionUID = -2028983921907321193L;
    private static final Logger LOG = LoggerFactory.getLogger(ProcTimeSortOperator.class);
    private final BaseRowTypeInfo inputRowType;
    private GeneratedRecordComparator gComparator;
    private transient RecordComparator comparator;
    private transient List<BaseRow> sortBuffer;
    private transient ListState<BaseRow> dataState;

    public ProcTimeSortOperator(BaseRowTypeInfo inputRowType, GeneratedRecordComparator gComparator) {
        this.inputRowType = inputRowType;
        this.gComparator = gComparator;
    }

    @Override
    public void open() throws Exception {
        super.open();
        LOG.info("Opening ProcTimeSortOperator");
        this.comparator = (RecordComparator)this.gComparator.newInstance(this.getContainingTask().getUserCodeClassLoader());
        this.gComparator = null;
        this.sortBuffer = new ArrayList<BaseRow>();
        ListStateDescriptor sortDescriptor = new ListStateDescriptor("sortState", (TypeInformation)this.inputRowType);
        this.dataState = this.getRuntimeContext().getListState(sortDescriptor);
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        BaseRow input = (BaseRow)element.getValue();
        long currentTime = this.timerService.currentProcessingTime();
        this.dataState.add((Object)input);
        this.timerService.registerProcessingTimeTimer(currentTime + 1L);
    }

    public void onProcessingTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
        Iterable inputs = (Iterable)this.dataState.get();
        this.sortBuffer.clear();
        inputs.forEach(this.sortBuffer::add);
        this.sortBuffer.sort(this.comparator);
        this.sortBuffer.forEach(row -> this.collector.collect(row));
        this.dataState.clear();
    }

    public void onEventTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
        throw new UnsupportedOperationException("Now Sort only is supported based processing time here!");
    }
}

