/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.history;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.HistoryServer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class HistoryServerArchiveFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);
    private static final JsonFactory jacksonFactory = new JsonFactory();
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String JSON_FILE_ENDING = ".json";
    private final List<HistoryServer.RefreshLocation> refreshDirs;
    private final Consumer<ArchiveEvent> jobArchiveEventListener;
    private final boolean processExpiredArchiveDeletion;
    private final boolean processBeyondLimitArchiveDeletion;
    private final int maxHistorySize;
    private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
    private final File webDir;
    private final File webJobDir;
    private final File webOverviewDir;

    HistoryServerArchiveFetcher(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, Consumer<ArchiveEvent> jobArchiveEventListener, boolean cleanupExpiredArchives, int maxHistorySize) throws IOException {
        this.refreshDirs = (List)Preconditions.checkNotNull(refreshDirs);
        this.jobArchiveEventListener = jobArchiveEventListener;
        this.processExpiredArchiveDeletion = cleanupExpiredArchives;
        this.maxHistorySize = maxHistorySize;
        this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
        this.cachedArchivesPerRefreshDirectory = new HashMap<Path, Set<String>>();
        for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
            this.cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet());
        }
        this.webDir = (File)Preconditions.checkNotNull((Object)webDir);
        this.webJobDir = new File(webDir, "jobs");
        Files.createDirectories(this.webJobDir.toPath(), new FileAttribute[0]);
        this.webOverviewDir = new File(webDir, "overviews");
        Files.createDirectories(this.webOverviewDir.toPath(), new FileAttribute[0]);
        HistoryServerArchiveFetcher.updateJobOverview(this.webOverviewDir, webDir);
        if (LOG.isInfoEnabled()) {
            for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
                LOG.info("Monitoring directory {} for archived jobs.", (Object)refreshDir.getPath());
            }
        }
    }

    void fetchArchives() {
        try {
            LOG.debug("Starting archive fetching.");
            ArrayList<ArchiveEvent> events = new ArrayList<ArchiveEvent>();
            HashMap<Path, Set<String>> jobsToRemove = new HashMap<Path, Set<String>>();
            this.cachedArchivesPerRefreshDirectory.forEach((path, archives) -> {
                Set cfr_ignored_0 = jobsToRemove.put((Path)path, new HashSet(archives));
            });
            HashMap<Path, Set<Path>> archivesBeyondSizeLimit = new HashMap<Path, Set<Path>>();
            for (HistoryServer.RefreshLocation refreshLocation : this.refreshDirs) {
                FileStatus[] jobArchives;
                Path refreshDir = refreshLocation.getPath();
                LOG.debug("Checking archive directory {}.", (Object)refreshDir);
                try {
                    jobArchives = HistoryServerArchiveFetcher.listArchives(refreshLocation.getFs(), refreshDir);
                }
                catch (IOException e) {
                    LOG.error("Failed to access job archive location for path {}.", (Object)refreshDir, (Object)e);
                    jobsToRemove.remove(refreshDir);
                    continue;
                }
                int historySize = 0;
                for (FileStatus jobArchive : jobArchives) {
                    Path jobArchivePath = jobArchive.getPath();
                    String jobID = jobArchivePath.getName();
                    if (!HistoryServerArchiveFetcher.isValidJobID(jobID, refreshDir)) continue;
                    ((Set)jobsToRemove.get(refreshDir)).remove(jobID);
                    if (++historySize > this.maxHistorySize && this.processBeyondLimitArchiveDeletion) {
                        archivesBeyondSizeLimit.computeIfAbsent(refreshDir, ignored -> new HashSet()).add(jobArchivePath);
                        continue;
                    }
                    if (this.cachedArchivesPerRefreshDirectory.get(refreshDir).contains(jobID)) {
                        LOG.trace("Ignoring archive {} because it was already fetched.", (Object)jobArchivePath);
                        continue;
                    }
                    LOG.info("Processing archive {}.", (Object)jobArchivePath);
                    try {
                        this.processArchive(jobID, jobArchivePath);
                        events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED));
                        this.cachedArchivesPerRefreshDirectory.get(refreshDir).add(jobID);
                        LOG.info("Processing archive {} finished.", (Object)jobArchivePath);
                    }
                    catch (IOException e) {
                        LOG.error("Failure while fetching/processing job archive for job {}.", (Object)jobID, (Object)e);
                        this.deleteJobFiles(jobID);
                    }
                }
            }
            if (jobsToRemove.values().stream().flatMap(Collection::stream).findAny().isPresent() && this.processExpiredArchiveDeletion) {
                events.addAll(this.cleanupExpiredJobs(jobsToRemove));
            }
            if (!archivesBeyondSizeLimit.isEmpty() && this.processBeyondLimitArchiveDeletion) {
                events.addAll(this.cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
            }
            if (!events.isEmpty()) {
                HistoryServerArchiveFetcher.updateJobOverview(this.webOverviewDir, this.webDir);
            }
            events.forEach(this.jobArchiveEventListener::accept);
            LOG.debug("Finished archive fetching.");
        }
        catch (Exception e) {
            LOG.error("Critical failure while fetching/processing job archives.", (Throwable)e);
        }
    }

    private static FileStatus[] listArchives(FileSystem refreshFS, Path refreshDir) throws IOException {
        FileStatus[] jobArchives = refreshFS.listStatus(refreshDir);
        if (jobArchives == null) {
            return new FileStatus[0];
        }
        Arrays.sort(jobArchives, Comparator.comparingLong(FileStatus::getModificationTime).reversed());
        return jobArchives;
    }

    private static boolean isValidJobID(String jobId, Path refreshDir) {
        try {
            JobID.fromHexString((String)jobId);
            return true;
        }
        catch (IllegalArgumentException iae) {
            LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.", new Object[]{refreshDir, jobId, iae});
            return false;
        }
    }

    private void processArchive(String jobID, Path jobArchive) throws IOException {
        for (ArchivedJson archive : FsJobArchivist.getArchivedJsons((Path)jobArchive)) {
            File target;
            String path = archive.getPath();
            String json = archive.getJson();
            if (path.equals("/jobs/overview")) {
                target = new File(this.webOverviewDir, jobID + JSON_FILE_ENDING);
            } else if (path.equals("/joboverview")) {
                LOG.debug("Migrating legacy archive {}", (Object)jobArchive);
                json = HistoryServerArchiveFetcher.convertLegacyJobOverview(json);
                target = new File(this.webOverviewDir, jobID + JSON_FILE_ENDING);
            } else {
                target = new File(this.webDir, path + JSON_FILE_ENDING);
            }
            java.nio.file.Path parent = target.getParentFile().toPath();
            try {
                Files.createDirectories(parent, new FileAttribute[0]);
            }
            catch (FileAlreadyExistsException fileAlreadyExistsException) {
                // empty catch block
            }
            java.nio.file.Path targetPath = target.toPath();
            Files.deleteIfExists(targetPath);
            Files.createFile(target.toPath(), new FileAttribute[0]);
            FileWriter fw = new FileWriter(target);
            Throwable throwable = null;
            try {
                fw.write(json);
                fw.flush();
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (fw == null) continue;
                if (throwable != null) {
                    try {
                        fw.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                fw.close();
            }
        }
    }

    private List<ArchiveEvent> cleanupJobsBeyondSizeLimit(Map<Path, Set<Path>> jobArchivesToRemove) {
        HashMap<Path, Set<String>> allJobIdsToRemoveFromOverview = new HashMap<Path, Set<String>>();
        for (Map.Entry<Path, Set<Path>> pathSetEntry : jobArchivesToRemove.entrySet()) {
            HashSet<String> jobIdsToRemoveFromOverview = new HashSet<String>();
            for (Path archive : pathSetEntry.getValue()) {
                jobIdsToRemoveFromOverview.add(archive.getName());
                try {
                    archive.getFileSystem().delete(archive, false);
                }
                catch (IOException ioe) {
                    LOG.warn("Could not delete old archive " + archive, (Throwable)ioe);
                }
            }
            allJobIdsToRemoveFromOverview.put(pathSetEntry.getKey(), jobIdsToRemoveFromOverview);
        }
        return this.cleanupExpiredJobs(allJobIdsToRemoveFromOverview);
    }

    private List<ArchiveEvent> cleanupExpiredJobs(Map<Path, Set<String>> jobsToRemove) {
        ArrayList<ArchiveEvent> deleteLog = new ArrayList<ArchiveEvent>();
        LOG.info("Archive directories for jobs {} were deleted.", jobsToRemove);
        jobsToRemove.forEach((refreshDir, archivesToRemove) -> this.cachedArchivesPerRefreshDirectory.get(refreshDir).removeAll((Collection<?>)archivesToRemove));
        jobsToRemove.values().stream().flatMap(Collection::stream).forEach(removedJobID -> {
            this.deleteJobFiles((String)removedJobID);
            deleteLog.add(new ArchiveEvent((String)removedJobID, ArchiveEventType.DELETED));
        });
        return deleteLog;
    }

    private void deleteJobFiles(String jobID) {
        try {
            Files.deleteIfExists(new File(this.webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
        }
        catch (IOException ioe) {
            LOG.warn("Could not delete file from overview directory.", (Throwable)ioe);
        }
        File jobDirectory = new File(this.webJobDir, jobID);
        try {
            FileUtils.deleteDirectory((File)jobDirectory);
        }
        catch (IOException ioe) {
            LOG.warn("Could not clean up job directory.", (Throwable)ioe);
        }
        try {
            Files.deleteIfExists(new File(this.webJobDir, jobID + JSON_FILE_ENDING).toPath());
        }
        catch (IOException ioe) {
            LOG.warn("Could not delete file from job directory.", (Throwable)ioe);
        }
    }

    private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
        int scheduled;
        JsonNode root = mapper.readTree(legacyOverview);
        JsonNode finishedJobs = root.get("finished");
        JsonNode job = finishedJobs.get(0);
        JobID jobId = JobID.fromHexString((String)job.get("jid").asText());
        String name = job.get("name").asText();
        JobStatus state = JobStatus.valueOf((String)job.get("state").asText());
        long startTime = job.get("start-time").asLong();
        long endTime = job.get("end-time").asLong();
        long duration = job.get("duration").asLong();
        long lastMod = job.get("last-modification").asLong();
        JsonNode tasks = job.get("tasks");
        int numTasks = tasks.get("total").asInt();
        JsonNode pendingNode = tasks.get("pending");
        boolean versionLessThan14 = pendingNode != null;
        int created = 0;
        int deploying = 0;
        if (versionLessThan14) {
            scheduled = pendingNode.asInt();
        } else {
            created = tasks.get("created").asInt();
            scheduled = tasks.get("scheduled").asInt();
            deploying = tasks.get("deploying").asInt();
        }
        int running = tasks.get("running").asInt();
        int finished = tasks.get("finished").asInt();
        int canceling = tasks.get("canceling").asInt();
        int canceled = tasks.get("canceled").asInt();
        int failed = tasks.get("failed").asInt();
        int[] tasksPerState = new int[ExecutionState.values().length];
        tasksPerState[ExecutionState.CREATED.ordinal()] = created;
        tasksPerState[ExecutionState.SCHEDULED.ordinal()] = scheduled;
        tasksPerState[ExecutionState.DEPLOYING.ordinal()] = deploying;
        tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
        tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
        tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
        tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
        tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
        JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
        MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));
        StringWriter sw = new StringWriter();
        mapper.writeValue((Writer)sw, (Object)multipleJobsDetails);
        return sw.toString();
    }

    private static void updateJobOverview(File webOverviewDir, File webDir) {
        try (JsonGenerator gen = jacksonFactory.createGenerator((Writer)HistoryServer.createOrGetFile(webDir, "/jobs/overview"));){
            File[] overviews = new File(webOverviewDir.getPath()).listFiles();
            if (overviews != null) {
                ArrayList allJobs = new ArrayList(overviews.length);
                for (File overview : overviews) {
                    MultipleJobsDetails subJobs = (MultipleJobsDetails)mapper.readValue(overview, MultipleJobsDetails.class);
                    allJobs.addAll(subJobs.getJobs());
                }
                mapper.writeValue(gen, (Object)new MultipleJobsDetails(allJobs));
            }
        }
        catch (IOException ioe) {
            LOG.error("Failed to update job overview.", (Throwable)ioe);
        }
    }

    public static class ArchiveEvent {
        private final String jobID;
        private final ArchiveEventType operation;

        ArchiveEvent(String jobID, ArchiveEventType operation) {
            this.jobID = jobID;
            this.operation = operation;
        }

        public String getJobID() {
            return this.jobID;
        }

        public ArchiveEventType getType() {
            return this.operation;
        }
    }

    public static enum ArchiveEventType {
        CREATED,
        DELETED;

    }
}

