/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, MediumTests.class})
public class TestBulkLoadReplication
extends TestReplicationBase {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
    protected static final Logger LOG = LoggerFactory.getLogger(TestBulkLoadReplication.class);
    private static final String PEER1_CLUSTER_ID = "peer1";
    private static final String PEER4_CLUSTER_ID = "peer4";
    private static final String PEER3_CLUSTER_ID = "peer3";
    private static final String PEER_ID1 = "1";
    private static final String PEER_ID3 = "3";
    private static final String PEER_ID4 = "4";
    private static AtomicInteger BULK_LOADS_COUNT;
    private static CountDownLatch BULK_LOAD_LATCH;
    private static final Path BULK_LOAD_BASE_DIR;
    private static HBaseTestingUtility utility3;
    private static HBaseTestingUtility utility4;
    private static Configuration conf3;
    private static Configuration conf4;
    private static Table htable3;
    private static Table htable4;
    @Rule
    public TestName name = new TestName();
    @ClassRule
    public static TemporaryFolder testFolder;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
        conf3 = HBaseConfiguration.create((Configuration)conf1);
        conf3.set("zookeeper.znode.parent", "/3");
        utility3 = new HBaseTestingUtility(conf3);
        conf4 = HBaseConfiguration.create((Configuration)conf1);
        conf4.set("zookeeper.znode.parent", "/4");
        utility3 = new HBaseTestingUtility(conf3);
        utility4 = new HBaseTestingUtility(conf4);
        TestReplicationBase.setUpBeforeClass();
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
        TestBulkLoadReplication.setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
        TestBulkLoadReplication.startCluster(utility3, conf3);
        TestBulkLoadReplication.startCluster(utility4, conf4);
    }

    private static void startCluster(HBaseTestingUtility util, Configuration configuration) throws Exception {
        LOG.info("Setup Zk to same one from utility1 and utility4");
        util.setZkCluster(utility1.getZkCluster());
        util.startMiniCluster(2);
        TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder((TableName)tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder((byte[])famName).setMobEnabled(true).setMobThreshold(4000L).setScope(1).build()).setColumnFamily(ColumnFamilyDescriptorBuilder.of((byte[])noRepfamName)).build();
        Connection connection = ConnectionFactory.createConnection((Configuration)configuration);
        try (Admin admin = connection.getAdmin();){
            admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
        }
        util.waitUntilAllRegionsAssigned(tableName);
    }

    @Override
    @Before
    public void setUpBase() throws Exception {
        super.setUpBase();
        ReplicationPeerConfig peer1Config = this.getPeerConfigForCluster(utility1);
        ReplicationPeerConfig peer4Config = this.getPeerConfigForCluster(utility4);
        ReplicationPeerConfig peer3Config = this.getPeerConfigForCluster(utility3);
        utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
        utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
        utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
        utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
        this.setupCoprocessor(utility1);
        this.setupCoprocessor(utility4);
        this.setupCoprocessor(utility3);
        BULK_LOADS_COUNT = new AtomicInteger(0);
    }

    private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
        return ReplicationPeerConfig.newBuilder().setClusterKey(util.getClusterKey()).setSerial(this.isSerialPeer()).build();
    }

    private void setupCoprocessor(HBaseTestingUtility cluster) {
        cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
            try {
                BulkReplicationTestObserver cp = (BulkReplicationTestObserver)r.getCoprocessorHost().findCoprocessor(BulkReplicationTestObserver.class);
                if (cp == null) {
                    r.getCoprocessorHost().load(BulkReplicationTestObserver.class, 0, cluster.getConfiguration());
                    cp = (BulkReplicationTestObserver)r.getCoprocessorHost().findCoprocessor(BulkReplicationTestObserver.class);
                    cp.clusterName = cluster.getClusterKey();
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
            }
        });
    }

    @Override
    @After
    public void tearDownBase() throws Exception {
        super.tearDownBase();
        utility4.getAdmin().removeReplicationPeer(PEER_ID1);
        utility4.getAdmin().removeReplicationPeer(PEER_ID3);
        utility3.getAdmin().removeReplicationPeer(PEER_ID4);
        utility1.getAdmin().removeReplicationPeer(PEER_ID4);
    }

    private static void setupBulkLoadConfigsForCluster(Configuration config, String clusterReplicationId) throws Exception {
        config.setBoolean("hbase.replication.bulkload.enabled", true);
        config.set("hbase.replication.cluster.id", clusterReplicationId);
        File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
        File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
        config.writeXml((OutputStream)new FileOutputStream(sourceConfigFile));
        config.set("hbase.replication.conf.dir", testFolder.getRoot().getAbsolutePath());
    }

    @Test
    public void testBulkLoadReplicationActiveActive() throws Exception {
        Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
        Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
        Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
        byte[] row = Bytes.toBytes((String)"001");
        byte[] value = Bytes.toBytes((String)"v1");
        this.assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
        row = Bytes.toBytes((String)"002");
        value = Bytes.toBytes((String)"v2");
        this.assertBulkLoadConditions(row, value, utility4, peer1TestTable, peer4TestTable, peer3TestTable);
        row = Bytes.toBytes((String)"003");
        value = Bytes.toBytes((String)"v3");
        this.assertBulkLoadConditions(row, value, utility3, peer1TestTable, peer4TestTable, peer3TestTable);
        Thread.sleep(400L);
        Assert.assertEquals((long)9L, (long)BULK_LOADS_COUNT.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
        Path path = this.createMobFiles(utility3);
        ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor descriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
        ExecutorService pool = null;
        try {
            pool = Executors.newFixedThreadPool(1);
            PartitionedMobCompactor compactor = new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(), tableName, (ColumnFamilyDescriptor)descriptor, pool);
            BULK_LOAD_LATCH = new CountDownLatch(1);
            BULK_LOADS_COUNT.set(0);
            compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true);
            Assert.assertTrue((boolean)BULK_LOAD_LATCH.await(1L, TimeUnit.SECONDS));
            Thread.sleep(400L);
            Assert.assertEquals((long)1L, (long)BULK_LOADS_COUNT.get());
        }
        finally {
            if (pool != null && !pool.isTerminated()) {
                pool.shutdownNow();
            }
        }
    }

    private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table ... tables) throws Exception {
        BULK_LOAD_LATCH = new CountDownLatch(3);
        this.bulkLoadOnCluster(row, value, utility);
        Assert.assertTrue((boolean)BULK_LOAD_LATCH.await(1L, TimeUnit.MINUTES));
        this.assertTableHasValue(tables[0], row, value);
        this.assertTableHasValue(tables[1], row, value);
        this.assertTableHasValue(tables[2], row, value);
    }

    private void bulkLoadOnCluster(byte[] row, byte[] value, HBaseTestingUtility cluster) throws Exception {
        String bulkLoadFile = this.createHFileForFamilies(row, value, cluster.getConfiguration());
        Path bulkLoadFilePath = new Path(bulkLoadFile);
        this.copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
        LoadIncrementalHFiles bulkLoadHFilesTool = new LoadIncrementalHFiles(cluster.getConfiguration());
        HashMap family2Files = new HashMap();
        ArrayList<Path> files = new ArrayList<Path>();
        files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName()));
        family2Files.put(Bytes.toBytes((String)"f"), files);
        bulkLoadHFilesTool.run(family2Files, tableName);
    }

    private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
        Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/");
        cluster.getFileSystem().mkdirs(bulkLoadDir);
        cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
    }

    private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
        Get get = new Get(row);
        Result result = table.get(get);
        Assert.assertTrue((boolean)result.advance());
        Assert.assertEquals((Object)Bytes.toString((byte[])value), (Object)Bytes.toString((byte[])result.value()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String createHFileForFamilies(byte[] row, byte[] value, Configuration clusterConfig) throws IOException {
        CellBuilder cellBuilder = CellBuilderFactory.create((CellBuilderType)CellBuilderType.DEEP_COPY);
        cellBuilder.setRow(row).setFamily(TestReplicationBase.famName).setQualifier(Bytes.toBytes((String)PEER_ID1)).setValue(value).setType(Cell.Type.Put);
        HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache((Configuration)clusterConfig);
        File hFileLocation = testFolder.newFile();
        try (FSDataOutputStream out = new FSDataOutputStream((OutputStream)new FileOutputStream(hFileLocation), null);){
            hFileFactory.withOutputStream(out);
            hFileFactory.withFileContext(new HFileContext());
            try (HFile.Writer writer = hFileFactory.create();){
                writer.append((Cell)new KeyValue(cellBuilder.build()));
            }
        }
        return hFileLocation.getAbsoluteFile().getAbsolutePath();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Path createMobFiles(HBaseTestingUtility util) throws IOException {
        Path testDir = FSUtils.getRootDir((Configuration)util.getConfiguration());
        Path mobTestDir = new Path(testDir, "mobdir");
        Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
        HFileContext meta = new HFileContextBuilder().withBlockSize(8192).build();
        MobFileName mobFileName = null;
        byte[] mobFileStartRow = new byte[32];
        for (byte rowKey : Bytes.toBytes((String)"01234")) {
            mobFileName = MobFileName.create((byte[])mobFileStartRow, (String)MobUtils.formatDate((Date)new Date()), (String)UUID.randomUUID().toString().replaceAll("-", ""));
            long now = System.currentTimeMillis();
            try (StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(util.getConfiguration(), new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();){
                for (int i = 0; i < 10; ++i) {
                    byte[] key = Bytes.add((byte[])Bytes.toBytes((short)rowKey), (byte[])Bytes.toBytes((int)i));
                    byte[] dummyData = new byte[5000];
                    new Random().nextBytes(dummyData);
                    mobFileWriter.append((Cell)new KeyValue(key, famName, Bytes.toBytes((String)PEER_ID1), now, KeyValue.Type.Put, dummyData));
                }
            }
        }
        return basePath;
    }

    static {
        BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
        testFolder = new TemporaryFolder();
    }

    public static class BulkReplicationTestObserver
    implements RegionCoprocessor {
        String clusterName;
        AtomicInteger bulkLoadCounts = new AtomicInteger();

        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(new RegionObserver(){

                public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) throws IOException {
                    BULK_LOAD_LATCH.countDown();
                    BULK_LOADS_COUNT.incrementAndGet();
                    LOG.debug("Another file bulk loaded. Total for {}: {}", (Object)clusterName, (Object)bulkLoadCounts.addAndGet(1));
                }
            });
        }
    }
}

