/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.marker;

import java.io.IOException;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hudi.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.marker.MarkerOperation;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.org.apache.http.client.fluent.Request;
import org.apache.hudi.org.apache.http.client.fluent.Response;
import org.apache.hudi.org.apache.http.client.utils.URIBuilder;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class TimelineServerBasedWriteMarkers
extends WriteMarkers {
    private static final Logger LOG = LogManager.getLogger(TimelineServerBasedWriteMarkers.class);
    private final ObjectMapper mapper = new ObjectMapper();
    private final String timelineServerHost;
    private final int timelineServerPort;
    private final int timeoutSecs;

    public TimelineServerBasedWriteMarkers(HoodieTable table, String instantTime) {
        this(table.getMetaClient().getBasePath(), table.getMetaClient().getMarkerFolderPath(instantTime), instantTime, table.getConfig().getViewStorageConfig().getRemoteViewServerHost(), table.getConfig().getViewStorageConfig().getRemoteViewServerPort(), table.getConfig().getViewStorageConfig().getRemoteTimelineClientTimeoutSecs());
    }

    TimelineServerBasedWriteMarkers(String basePath, String markerFolderPath, String instantTime, String timelineServerHost, int timelineServerPort, int timeoutSecs) {
        super(basePath, markerFolderPath, instantTime);
        this.timelineServerHost = timelineServerHost;
        this.timelineServerPort = timelineServerPort;
        this.timeoutSecs = timeoutSecs;
    }

    @Override
    public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
        Map<String, String> paramsMap = Collections.singletonMap("markerdirpath", this.markerDirPath.toString());
        try {
            return (Boolean)this.executeRequestToTimelineServer(MarkerOperation.DELETE_MARKER_DIR_URL, paramsMap, new TypeReference<Boolean>(){}, RequestMethod.POST);
        }
        catch (IOException e) {
            throw new HoodieRemoteException("Failed to delete marker directory " + this.markerDirPath.toString(), e);
        }
    }

    @Override
    public boolean doesMarkerDirExist() {
        Map<String, String> paramsMap = Collections.singletonMap("markerdirpath", this.markerDirPath.toString());
        try {
            return (Boolean)this.executeRequestToTimelineServer(MarkerOperation.MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference<Boolean>(){}, RequestMethod.GET);
        }
        catch (IOException e) {
            throw new HoodieRemoteException("Failed to check marker directory " + this.markerDirPath.toString(), e);
        }
    }

    @Override
    public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int parallelism) throws IOException {
        Map<String, String> paramsMap = Collections.singletonMap("markerdirpath", this.markerDirPath.toString());
        try {
            Set markerPaths = (Set)this.executeRequestToTimelineServer(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, paramsMap, new TypeReference<Set<String>>(){}, RequestMethod.GET);
            return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
        }
        catch (IOException e) {
            throw new HoodieRemoteException("Failed to get CREATE and MERGE data file paths in " + this.markerDirPath.toString(), e);
        }
    }

    @Override
    public Set<String> allMarkerFilePaths() {
        Map<String, String> paramsMap = Collections.singletonMap("markerdirpath", this.markerDirPath.toString());
        try {
            return (Set)this.executeRequestToTimelineServer(MarkerOperation.ALL_MARKERS_URL, paramsMap, new TypeReference<Set<String>>(){}, RequestMethod.GET);
        }
        catch (IOException e) {
            throw new HoodieRemoteException("Failed to get all markers in " + this.markerDirPath.toString(), e);
        }
    }

    @Override
    protected Option<Path> create(String partitionPath, String dataFileName, IOType type, boolean checkIfExists) {
        HoodieTimer timer = HoodieTimer.start();
        String markerFileName = this.getMarkerFileName(dataFileName, type);
        Map<String, String> paramsMap = this.getConfigMap(partitionPath, markerFileName, false);
        boolean success = this.executeCreateMarkerRequest(paramsMap, partitionPath, markerFileName);
        LOG.info((Object)("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName + " in " + timer.endTimer() + " ms"));
        if (success) {
            return Option.of(new Path(FSUtils.getPartitionPath(this.markerDirPath, partitionPath), markerFileName));
        }
        return Option.empty();
    }

    @Override
    public Option<Path> createWithEarlyConflictDetection(String partitionPath, String dataFileName, IOType type, boolean checkIfExists, HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline) {
        HoodieTimer timer = new HoodieTimer().startTimer();
        String markerFileName = this.getMarkerFileName(dataFileName, type);
        Map<String, String> paramsMap = this.getConfigMap(partitionPath, markerFileName, true);
        boolean success = this.executeCreateMarkerRequest(paramsMap, partitionPath, markerFileName);
        LOG.info((Object)("[timeline-server-based] Created marker file with early conflict detection " + partitionPath + "/" + markerFileName + " in " + timer.endTimer() + " ms"));
        if (success) {
            return Option.of(new Path(FSUtils.getPartitionPath(this.markerDirPath, partitionPath), markerFileName));
        }
        throw new HoodieEarlyConflictDetectionException(new ConcurrentModificationException("Early conflict detected but cannot resolve conflicts for overlapping writes"));
    }

    private boolean executeCreateMarkerRequest(Map<String, String> paramsMap, String partitionPath, String markerFileName) {
        boolean success;
        try {
            success = (Boolean)this.executeRequestToTimelineServer(MarkerOperation.CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>(){}, RequestMethod.POST);
        }
        catch (IOException e) {
            throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e);
        }
        return success;
    }

    private Map<String, String> getConfigMap(String partitionPath, String markerFileName, boolean initEarlyConflictDetectionConfigs) {
        HashMap<String, String> paramsMap = new HashMap<String, String>();
        paramsMap.put("markerdirpath", this.markerDirPath.toString());
        if (StringUtils.isNullOrEmpty(partitionPath)) {
            paramsMap.put("markername", markerFileName);
        } else {
            paramsMap.put("markername", partitionPath + "/" + markerFileName);
        }
        if (initEarlyConflictDetectionConfigs) {
            paramsMap.put("basepath", this.basePath);
        }
        return paramsMap;
    }

    private <T> T executeRequestToTimelineServer(String requestPath, Map<String, String> queryParameters, TypeReference reference, RequestMethod method) throws IOException {
        Response response;
        URIBuilder builder = new URIBuilder().setHost(this.timelineServerHost).setPort(this.timelineServerPort).setPath(requestPath).setScheme("http");
        queryParameters.forEach(builder::addParameter);
        String url2 = builder.toString();
        LOG.info((Object)("Sending request : (" + url2 + ")"));
        int timeout = this.timeoutSecs * 1000;
        switch (method) {
            case GET: {
                response = Request.Get(url2).connectTimeout(timeout).socketTimeout(timeout).execute();
                break;
            }
            default: {
                response = Request.Post(url2).connectTimeout(timeout).socketTimeout(timeout).execute();
            }
        }
        String content = response.returnContent().asString();
        return this.mapper.readValue(content, reference);
    }

    private static enum RequestMethod {
        GET,
        POST;

    }
}

