/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;

public class TaskLocalStateStoreImplTest {
    private SortedMap<Long, TaskStateSnapshot> internalSnapshotMap;
    private Object internalLock;
    private TemporaryFolder temporaryFolder;
    private File[] allocationBaseDirs;
    private TaskLocalStateStoreImpl taskLocalStateStore;

    @Before
    public void before() throws Exception {
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        JobVertexID jobVertexID = new JobVertexID();
        int subtaskIdx = 0;
        this.temporaryFolder = new TemporaryFolder();
        this.temporaryFolder.create();
        this.allocationBaseDirs = new File[]{this.temporaryFolder.newFolder(), this.temporaryFolder.newFolder()};
        this.internalSnapshotMap = new TreeMap<Long, TaskStateSnapshot>();
        this.internalLock = new Object();
        LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(this.allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
        LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(false, (LocalRecoveryDirectoryProvider)directoryProvider);
        this.taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, subtaskIdx, localRecoveryConfig, Executors.directExecutor(), this.internalSnapshotMap, this.internalLock);
    }

    @After
    public void after() {
        this.temporaryFolder.delete();
    }

    @Test
    public void getLocalRecoveryRootDirectoryProvider() {
        LocalRecoveryConfig directoryProvider = this.taskLocalStateStore.getLocalRecoveryConfig();
        Assert.assertEquals((long)this.allocationBaseDirs.length, (long)directoryProvider.getLocalStateDirectoryProvider().allocationBaseDirsCount());
        for (int i = 0; i < this.allocationBaseDirs.length; ++i) {
            Assert.assertEquals((Object)this.allocationBaseDirs[i], (Object)directoryProvider.getLocalStateDirectoryProvider().selectAllocationBaseDirectory(i));
        }
    }

    @Test
    public void storeAndRetrieve() throws Exception {
        int chkCount = 3;
        for (int i = 0; i < 3; ++i) {
            Assert.assertNull((Object)this.taskLocalStateStore.retrieveLocalState((long)i));
        }
        List<TaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.checkStoredAsExpected(taskStateSnapshots, 0, 3);
        Assert.assertNull((Object)this.taskLocalStateStore.retrieveLocalState(4L));
    }

    @Test
    public void pruneCheckpoints() throws Exception {
        int chkCount = 3;
        List<TaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.pruneMatchingCheckpoints(chk -> chk != 2L);
        for (int i = 0; i < 2; ++i) {
            Assert.assertNull((Object)this.taskLocalStateStore.retrieveLocalState((long)i));
        }
        this.checkStoredAsExpected(taskStateSnapshots, 2, 3);
    }

    @Test
    public void confirmCheckpoint() throws Exception {
        int chkCount = 3;
        int confirmed = 2;
        List<TaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.confirmCheckpoint(2L);
        this.checkPrunedAndDiscarded(taskStateSnapshots, 0, 2);
        this.checkStoredAsExpected(taskStateSnapshots, 2, 3);
    }

    @Test
    public void abortCheckpoint() throws Exception {
        int chkCount = 4;
        int aborted = 2;
        List<TaskStateSnapshot> taskStateSnapshots = this.storeStates(4);
        this.taskLocalStateStore.abortCheckpoint(2L);
        this.checkPrunedAndDiscarded(taskStateSnapshots, 2, 3);
        this.checkStoredAsExpected(taskStateSnapshots, 0, 2);
        this.checkStoredAsExpected(taskStateSnapshots, 3, 4);
    }

    @Test
    public void dispose() throws Exception {
        int chkCount = 3;
        int confirmed = 2;
        List<TaskStateSnapshot> taskStateSnapshots = this.storeStates(3);
        this.taskLocalStateStore.confirmCheckpoint(2L);
        this.taskLocalStateStore.dispose();
        this.checkPrunedAndDiscarded(taskStateSnapshots, 0, 3);
    }

    private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end) throws Exception {
        for (int i = start; i < end; ++i) {
            TaskStateSnapshot expected = history.get(i);
            Assert.assertTrue((expected == this.taskLocalStateStore.retrieveLocalState((long)i) ? 1 : 0) != 0);
            ((TaskStateSnapshot)Mockito.verify((Object)expected, (VerificationMode)Mockito.never())).discardState();
        }
    }

    private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end) throws Exception {
        for (int i = start; i < end; ++i) {
            Assert.assertNull((Object)this.taskLocalStateStore.retrieveLocalState((long)i));
            ((TaskStateSnapshot)Mockito.verify((Object)history.get(i))).discardState();
        }
    }

    private List<TaskStateSnapshot> storeStates(int count) {
        ArrayList<TaskStateSnapshot> taskStateSnapshots = new ArrayList<TaskStateSnapshot>(count);
        for (int i = 0; i < count; ++i) {
            OperatorID operatorID = new OperatorID();
            TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot)PowerMockito.spy((Object)new TaskStateSnapshot());
            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState();
            taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
            this.taskLocalStateStore.storeLocalState((long)i, taskStateSnapshot);
            taskStateSnapshots.add(taskStateSnapshot);
        }
        return taskStateSnapshots;
    }
}

