/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.zookeeper;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class ZooKeeperStateHandleStoreTest
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZOOKEEPER != null) {
            ZOOKEEPER.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    @Test
    public void testAddAndLock() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        String pathInZooKeeper = "/testAdd";
        Long state = 1239712317L;
        store.addAndLock("/testAdd", (Serializable)state);
        Assert.assertEquals((long)1L, (long)store.getAllAndLock().size());
        Assert.assertEquals((Object)state, (Object)store.getAndLock("/testAdd").retrieveState());
        Stat stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/testAdd");
        Assert.assertNotNull((Object)stat);
        Assert.assertEquals((long)0L, (long)stat.getEphemeralOwner());
        List children = (List)ZOOKEEPER.getClient().getChildren().forPath("/testAdd");
        Assert.assertEquals((long)1L, (long)children.size());
        stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/testAdd/" + (String)children.get(0));
        Assert.assertNotNull((Object)stat);
        Assert.assertNotEquals((long)0L, (long)stat.getEphemeralOwner());
        Long actual = (Long)((RetrievableStateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZOOKEEPER.getClient().getData().forPath("/testAdd")), (ClassLoader)ClassLoader.getSystemClassLoader())).retrieveState();
        Assert.assertEquals((Object)state, (Object)actual);
    }

    @Test(expected=Exception.class)
    public void testAddAlreadyExistingPath() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
        store.addAndLock("/testAddAlreadyExistingPath", (Serializable)Long.valueOf(1L));
        Assert.assertEquals((Object)1, stateHandleProvider.getStateHandles());
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testAddDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        CuratorFramework client = (CuratorFramework)Mockito.spy((Object)ZOOKEEPER.getClient());
        Mockito.when((Object)client.inTransaction().create()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(client, (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
        Long state = 81282227L;
        try {
            store.addAndLock("/testAddDiscardStateHandleAfterFailure", (Serializable)state);
            Assert.fail((String)"Did not throw expected exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)state, (Object)stateHandleProvider.getStateHandles().get(0).retrieveState());
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testReplace() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testReplace";
        Long initialState = 30968470898L;
        Long replaceState = 88383776661L;
        store.addAndLock("/testReplace", (Serializable)initialState);
        store.replace("/testReplace", 0, (Serializable)replaceState);
        Assert.assertEquals((long)2L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)initialState, (Object)stateHandleProvider.getStateHandles().get(0).retrieveState());
        Assert.assertEquals((Object)replaceState, (Object)stateHandleProvider.getStateHandles().get(1).retrieveState());
        Stat stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/testReplace");
        Assert.assertNotNull((Object)stat);
        Assert.assertEquals((long)0L, (long)stat.getEphemeralOwner());
        Long actual = (Long)((RetrievableStateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZOOKEEPER.getClient().getData().forPath("/testReplace")), (ClassLoader)ClassLoader.getSystemClassLoader())).retrieveState();
        Assert.assertEquals((Object)replaceState, (Object)actual);
    }

    @Test(expected=Exception.class)
    public void testReplaceNonExistingPath() throws Exception {
        LongStateStorage stateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateStorage);
        store.replace("/testReplaceNonExistingPath", 0, (Serializable)Long.valueOf(1L));
    }

    @Test
    public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        CuratorFramework client = (CuratorFramework)Mockito.spy((Object)ZOOKEEPER.getClient());
        Mockito.when((Object)client.setData()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(client, (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
        Long initialState = 30968470898L;
        Long replaceState = 88383776661L;
        store.addAndLock("/testReplaceDiscardStateHandleAfterFailure", (Serializable)initialState);
        try {
            store.replace("/testReplaceDiscardStateHandleAfterFailure", 0, (Serializable)replaceState);
            Assert.fail((String)"Did not throw expected exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)initialState, (Object)stateHandleProvider.getStateHandles().get(0).retrieveState());
        Assert.assertEquals((Object)replaceState, (Object)stateHandleProvider.getStateHandles().get(1).retrieveState());
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
        Long actual = (Long)((RetrievableStateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZOOKEEPER.getClient().getData().forPath("/testReplaceDiscardStateHandleAfterFailure")), (ClassLoader)ClassLoader.getSystemClassLoader())).retrieveState();
        Assert.assertEquals((Object)initialState, (Object)actual);
    }

    @Test
    public void testGetAndExists() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testGetAndExists";
        Long state = 311222268470898L;
        Assert.assertEquals((long)-1L, (long)store.exists("/testGetAndExists"));
        store.addAndLock("/testGetAndExists", (Serializable)state);
        RetrievableStateHandle actual = store.getAndLock("/testGetAndExists");
        Assert.assertEquals((Object)state, (Object)actual.retrieveState());
        Assert.assertTrue((store.exists("/testGetAndExists") >= 0 ? 1 : 0) != 0);
    }

    @Test(expected=Exception.class)
    public void testGetNonExistingPath() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        store.getAndLock("/testGetNonExistingPath");
    }

    @Test
    public void testGetAll() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testGetAll";
        HashSet<Long> expected = new HashSet<Long>();
        expected.add(311222268470898L);
        expected.add(132812888L);
        expected.add(27255442L);
        expected.add(11122233124L);
        Iterator iterator = expected.iterator();
        while (iterator.hasNext()) {
            long val = (Long)iterator.next();
            store.addAndLock("/testGetAll" + val, (Serializable)Long.valueOf(val));
        }
        for (Tuple2 val : store.getAllAndLock()) {
            Assert.assertTrue((boolean)expected.remove(((RetrievableStateHandle)val.f0).retrieveState()));
        }
        Assert.assertEquals((long)0L, (long)expected.size());
    }

    @Test
    public void testGetAllSortedByName() throws Exception {
        Object[] expected;
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String basePath = "/testGetAllSortedByName";
        Object[] objectArray = expected = new Long[]{311222268470898L, 132812888L, 27255442L, 11122233124L};
        int n = objectArray.length;
        for (int i = 0; i < n; ++i) {
            long val = objectArray[i];
            String pathInZooKeeper = String.format("%s%016d", "/testGetAllSortedByName", val);
            store.addAndLock(pathInZooKeeper, (Serializable)Long.valueOf(val));
        }
        List actual = store.getAllAndLock();
        Assert.assertEquals((long)expected.length, (long)actual.size());
        Arrays.sort(expected);
        Collections.sort(actual, Comparator.comparing(o -> (String)o.f1));
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)((RetrievableStateHandle)((Tuple2)actual.get((int)i)).f0).retrieveState());
        }
    }

    @Test
    public void testRemove() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testRemove";
        Long state = 27255442L;
        store.addAndLock("/testRemove", (Serializable)state);
        int numberOfGlobalDiscardCalls = LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
        store.releaseAndTryRemove("/testRemove");
        Assert.assertEquals((long)0L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath("/")).size());
        Assert.assertEquals((long)(numberOfGlobalDiscardCalls + 1), (long)LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
    }

    @Test
    public void testReleaseAndTryRemoveAll() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testDiscardAll";
        HashSet<Long> expected = new HashSet<Long>();
        expected.add(311222268470898L);
        expected.add(132812888L);
        expected.add(27255442L);
        expected.add(11122233124L);
        Iterator iterator = expected.iterator();
        while (iterator.hasNext()) {
            long val = (Long)iterator.next();
            store.addAndLock("/testDiscardAll" + val, (Serializable)Long.valueOf(val));
        }
        store.releaseAndTryRemoveAll();
        Assert.assertEquals((long)0L, (long)((List)ZOOKEEPER.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testCorruptedData() throws Exception {
        LongStateStorage stateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)stateStorage);
        HashSet<Long> input = new HashSet<Long>();
        input.add(1L);
        input.add(2L);
        input.add(3L);
        for (Long aLong : input) {
            store.addAndLock("/" + aLong, (Serializable)aLong);
        }
        ZOOKEEPER.getClient().setData().forPath("/2", new byte[2]);
        List allEntries = store.getAllAndLock();
        HashSet expected = new HashSet(input);
        expected.remove(2L);
        HashSet<Serializable> actual = new HashSet<Serializable>(expected.size());
        for (Tuple2 entry : allEntries) {
            actual.add(((RetrievableStateHandle)entry.f0).retrieveState());
        }
        Assert.assertEquals(expected, actual);
    }

    @Test
    public void testConcurrentDeleteOperation() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        String statePath = "/state";
        zkStore1.addAndLock("/state", (Serializable)Long.valueOf(42L));
        RetrievableStateHandle stateHandle = zkStore2.getAndLock("/state");
        zkStore1.releaseAndTryRemove("/state");
        Assert.assertEquals((long)42L, (long)((Long)stateHandle.retrieveState()));
        Stat nodeStat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/state");
        Assert.assertNotNull((String)"NodeStat should not be null, otherwise the referenced node does not exist.", (Object)nodeStat);
        zkStore2.releaseAndTryRemove("/state");
        nodeStat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/state");
        Assert.assertNull((String)"NodeState should be null, because the referenced node should no longer exist.", (Object)nodeStat);
    }

    @Test
    public void testLockCleanupWhenGetAndLockFails() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        String path = "/state";
        zkStore1.addAndLock("/state", (Serializable)Long.valueOf(42L));
        byte[] corruptedData = new byte[]{1, 2};
        ZOOKEEPER.getClient().setData().forPath("/state", corruptedData);
        try {
            zkStore2.getAndLock("/state");
            Assert.fail((String)"Should fail because we cannot deserialize the node's data");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        String lockNodePath = zkStore2.getLockPath("/state");
        Stat stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath(lockNodePath);
        Assert.assertNull((String)"zkStore2 should not have created a lock node.", (Object)stat);
        Collection children = (Collection)ZOOKEEPER.getClient().getChildren().forPath("/state");
        Assert.assertEquals((long)1L, (long)children.size());
        zkStore1.releaseAndTryRemove("/state");
        stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/state");
        Assert.assertNull((String)"The state node should have been removed.", (Object)stat);
    }

    @Test
    public void testLockCleanupWhenClientTimesOut() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
        configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100);
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout");
        try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
             CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);){
            ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore(client, (RetrievableStateStorageHelper)longStateStorage);
            String path = "/state";
            zkStore.addAndLock("/state", (Serializable)Long.valueOf(42L));
            client.close();
            Stat stat = (Stat)client2.checkExists().forPath("/state");
            Assert.assertNotNull((Object)stat);
            Collection children = (Collection)client2.getChildren().forPath("/state");
            Assert.assertEquals((long)0L, (long)children.size());
        }
    }

    @Test
    public void testRelease() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        String path = "/state";
        zkStore.addAndLock("/state", (Serializable)Long.valueOf(42L));
        String lockPath = zkStore.getLockPath("/state");
        Stat stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath(lockPath);
        Assert.assertNotNull((String)"Expected an existing lock", (Object)stat);
        zkStore.release("/state");
        stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/state");
        Assert.assertEquals((String)"Expected no lock nodes as children", (long)0L, (long)stat.getNumChildren());
        zkStore.releaseAndTryRemove("/state");
        stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/state");
        Assert.assertNull((String)"State node should have been removed.", (Object)stat);
    }

    @Test
    public void testReleaseAll() throws Exception {
        Stat stat;
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore(ZOOKEEPER.getClient(), (RetrievableStateStorageHelper)longStateStorage);
        List<String> paths = Arrays.asList("/state1", "/state2", "/state3");
        for (String path : paths) {
            zkStore.addAndLock(path, (Serializable)Long.valueOf(42L));
        }
        for (String path : paths) {
            stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath(zkStore.getLockPath(path));
            Assert.assertNotNull((String)"Expecte and existing lock.", (Object)stat);
        }
        zkStore.releaseAll();
        for (String path : paths) {
            stat = (Stat)ZOOKEEPER.getClient().checkExists().forPath(path);
            Assert.assertEquals((long)0L, (long)stat.getNumChildren());
        }
        zkStore.releaseAndTryRemoveAll();
        Stat stat2 = (Stat)ZOOKEEPER.getClient().checkExists().forPath("/");
        Assert.assertEquals((long)0L, (long)stat2.getNumChildren());
    }

    @Test
    public void testDeleteAllShouldRemoveAllPaths() throws Exception {
        ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore(ZooKeeperUtils.useNamespaceAndEnsurePath((CuratorFramework)ZOOKEEPER.getClient(), (String)"/path"), (RetrievableStateStorageHelper)new LongStateStorage());
        zkStore.addAndLock("/state", (Serializable)Long.valueOf(1L));
        zkStore.deleteChildren();
        Assert.assertThat((Object)zkStore.getAllPaths(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    private static class LongRetrievableStateHandle
    implements RetrievableStateHandle<Long> {
        private static final long serialVersionUID = -3555329254423838912L;
        private static int numberOfGlobalDiscardCalls = 0;
        private final Long state;
        private int numberOfDiscardCalls = 0;

        public LongRetrievableStateHandle(Long state) {
            this.state = state;
        }

        public Long retrieveState() {
            return this.state;
        }

        public void discardState() throws Exception {
            ++numberOfGlobalDiscardCalls;
            ++this.numberOfDiscardCalls;
        }

        public long getStateSize() {
            return 0L;
        }

        int getNumberOfDiscardCalls() {
            return this.numberOfDiscardCalls;
        }

        public static int getNumberOfGlobalDiscardCalls() {
            return numberOfGlobalDiscardCalls;
        }
    }

    private static class LongStateStorage
    implements RetrievableStateStorageHelper<Long> {
        private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<LongRetrievableStateHandle>();

        private LongStateStorage() {
        }

        public RetrievableStateHandle<Long> store(Long state) throws Exception {
            LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state);
            this.stateHandles.add(stateHandle);
            return stateHandle;
        }

        List<LongRetrievableStateHandle> getStateHandles() {
            return this.stateHandles;
        }
    }
}

