/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager;
import org.apache.spark.sql.execution.streaming.CheckpointFileManager$;
import org.apache.spark.sql.execution.streaming.MetadataVersionUtil$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaCompatibilityChecker$;
import org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible;
import org.apache.spark.sql.execution.streaming.state.StateStoreProviderId;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005%a\u0001\u0002\u000b\u0016\u0001\u0011B\u0001\"\r\u0001\u0003\u0002\u0003\u0006IA\r\u0005\tm\u0001\u0011\t\u0011)A\u0005o!)q\b\u0001C\u0001\u0001\"9A\t\u0001b\u0001\n\u0013)\u0005B\u0002'\u0001A\u0003%a\tC\u0004N\u0001\t\u0007I\u0011\u0002(\t\rM\u0003\u0001\u0015!\u0003P\u0011\u001d!\u0006A1A\u0005\n\u0015Ca!\u0016\u0001!\u0002\u00131\u0005\"\u0002,\u0001\t\u00039\u0006\"B3\u0001\t\u00131\u0007\"\u00028\u0001\t\u0013y\u0007\"B:\u0001\t\u0013!\b\"B<\u0001\t\u0013Ax!\u0002>\u0016\u0011\u0003Yh!\u0002\u000b\u0016\u0011\u0003a\b\"B \u0011\t\u0003i\bb\u0002@\u0011\u0005\u0004%\ta \u0005\t\u0003\u000f\u0001\u0002\u0015!\u0003\u0002\u0002\ty2\u000b^1uKN\u001b\u0007.Z7b\u0007>l\u0007/\u0019;jE&d\u0017\u000e^=DQ\u0016\u001c7.\u001a:\u000b\u0005Y9\u0012!B:uCR,'B\u0001\r\u001a\u0003%\u0019HO]3b[&twM\u0003\u0002\u001b7\u0005IQ\r_3dkRLwN\u001c\u0006\u00039u\t1a]9m\u0015\tqr$A\u0003ta\u0006\u00148N\u0003\u0002!C\u00051\u0011\r]1dQ\u0016T\u0011AI\u0001\u0004_J<7\u0001A\n\u0004\u0001\u0015Z\u0003C\u0001\u0014*\u001b\u00059#\"\u0001\u0015\u0002\u000bM\u001c\u0017\r\\1\n\u0005):#AB!osJ+g\r\u0005\u0002-_5\tQF\u0003\u0002/;\u0005A\u0011N\u001c;fe:\fG.\u0003\u00021[\t9Aj\\4hS:<\u0017A\u00039s_ZLG-\u001a:JIB\u00111\u0007N\u0007\u0002+%\u0011Q'\u0006\u0002\u0015'R\fG/Z*u_J,\u0007K]8wS\u0012,'/\u00133\u0002\u0015!\fGm\\8q\u0007>tg\r\u0005\u00029{5\t\u0011H\u0003\u0002;w\u0005!1m\u001c8g\u0015\tat$\u0001\u0004iC\u0012|w\u000e]\u0005\u0003}e\u0012QbQ8oM&<WO]1uS>t\u0017A\u0002\u001fj]&$h\bF\u0002B\u0005\u000e\u0003\"a\r\u0001\t\u000bE\u001a\u0001\u0019\u0001\u001a\t\u000bY\u001a\u0001\u0019A\u001c\u0002\u001fM$xN]3Da2{7-\u0019;j_:,\u0012A\u0012\t\u0003\u000f*k\u0011\u0001\u0013\u0006\u0003\u0013n\n!AZ:\n\u0005-C%\u0001\u0002)bi\"\f\u0001c\u001d;pe\u0016\u001c\u0005\u000fT8dCRLwN\u001c\u0011\u0002\u0005\u0019lW#A(\u0011\u0005A\u000bV\"A\f\n\u0005I;\"!F\"iK\u000e\\\u0007o\\5oi\u001aKG.Z'b]\u0006<WM]\u0001\u0004M6\u0004\u0013AE:dQ\u0016l\u0017MR5mK2{7-\u0019;j_:\f1c]2iK6\fg)\u001b7f\u0019>\u001c\u0017\r^5p]\u0002\nQa\u00195fG.$2\u0001W.d!\t1\u0013,\u0003\u0002[O\t!QK\\5u\u0011\u0015a&\u00021\u0001^\u0003%YW-_*dQ\u0016l\u0017\r\u0005\u0002_C6\tqL\u0003\u0002a7\u0005)A/\u001f9fg&\u0011!m\u0018\u0002\u000b'R\u0014Xo\u0019;UsB,\u0007\"\u00023\u000b\u0001\u0004i\u0016a\u0003<bYV,7k\u00195f[\u0006\f\u0011c]2iK6\f7oQ8na\u0006$\u0018N\u00197f)\r9'\u000e\u001c\t\u0003M!L!![\u0014\u0003\u000f\t{w\u000e\\3b]\")1n\u0003a\u0001;\u0006a1\u000f^8sK\u0012\u001c6\r[3nC\")Qn\u0003a\u0001;\u000611o\u00195f[\u0006\faB]3bIN\u001b\u0007.Z7b\r&dW\rF\u0001q!\u00111\u0013/X/\n\u0005I<#A\u0002+va2,''\u0001\tde\u0016\fG/Z*dQ\u0016l\u0017MR5mKR\u0019\u0001,\u001e<\t\u000bqk\u0001\u0019A/\t\u000b\u0011l\u0001\u0019A/\u0002\u0015M\u001c\u0007.Z7b\r&dW\r\u0006\u0002Gs\")AI\u0004a\u0001\r\u0006y2\u000b^1uKN\u001b\u0007.Z7b\u0007>l\u0007/\u0019;jE&d\u0017\u000e^=DQ\u0016\u001c7.\u001a:\u0011\u0005M\u00022C\u0001\t&)\u0005Y\u0018a\u0002,F%NKuJT\u000b\u0003\u0003\u0003\u00012AJA\u0002\u0013\r\t)a\n\u0002\u0004\u0013:$\u0018\u0001\u0003,F%NKuJ\u0014\u0011")
public class StateSchemaCompatibilityChecker
implements Logging {
    private final StateStoreProviderId providerId;
    private final Path storeCpLocation;
    private final CheckpointFileManager fm;
    private final Path schemaFileLocation;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static int VERSION() {
        return StateSchemaCompatibilityChecker$.MODULE$.VERSION();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private Path storeCpLocation() {
        return this.storeCpLocation;
    }

    private CheckpointFileManager fm() {
        return this.fm;
    }

    private Path schemaFileLocation() {
        return this.schemaFileLocation;
    }

    public void check(StructType keySchema, StructType valueSchema) {
        if (this.fm().exists(this.schemaFileLocation())) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(65).append("Schema file for provider ").append($this.providerId).append(" exists. Comparing with provided schema.").toString());
            Tuple2<StructType, StructType> tuple2 = this.readSchemaFile();
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            StructType storedKeySchema = (StructType)tuple2._1();
            StructType storedValueSchema = (StructType)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)storedKeySchema, (Object)storedValueSchema);
            Tuple2 tuple23 = tuple22;
            StructType storedKeySchema2 = (StructType)tuple23._1();
            StructType storedValueSchema2 = (StructType)tuple23._2();
            if (!storedKeySchema2.equals((Object)keySchema) || !storedValueSchema2.equals((Object)valueSchema)) {
                if (!this.schemasCompatible(storedKeySchema2, keySchema) || !this.schemasCompatible(storedValueSchema2, valueSchema)) {
                    String errorMsg = new StringBuilder(442).append("Provided schema doesn't match to the schema for existing state! Please note that Spark allow difference of field name: check count of fields and data type of each field.\n").append("- Provided key schema: ").append(keySchema).append("\n").append("- Provided value schema: ").append(valueSchema).append("\n").append("- Existing key schema: ").append(storedKeySchema2).append("\n").append("- Existing value schema: ").append(storedValueSchema2).append("\n").append("If you want to force running query without schema validation, please set ").append(SQLConf$.MODULE$.STATE_SCHEMA_CHECK_ENABLED().key()).append(" to false.\n").append("Please note running query with incompatible schema could cause indeterministic").append(" behavior.").toString();
                    this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> errorMsg);
                    throw new StateSchemaNotCompatible(errorMsg);
                }
                this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Detected schema change which is compatible. Allowing to put rows.");
            }
        } else {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(54).append("Schema file for provider ").append($this.providerId).append(" doesn't exist. Creating one.").toString());
            this.createSchemaFile(keySchema, valueSchema);
        }
    }

    private boolean schemasCompatible(StructType storedSchema, StructType schema) {
        return DataType$.MODULE$.equalsIgnoreNameAndCompatibleNullability((DataType)storedSchema, (DataType)schema);
    }

    private Tuple2<StructType, StructType> readSchemaFile() {
        Tuple2 tuple2;
        try (FSDataInputStream inStream = this.fm().open(this.schemaFileLocation());){
            try {
                String versionStr = inStream.readUTF();
                int version = MetadataVersionUtil$.MODULE$.validateVersion(versionStr, StateSchemaCompatibilityChecker$.MODULE$.VERSION());
                Predef$.MODULE$.require(version == 1);
                String keySchemaStr = inStream.readUTF();
                String valueSchemaStr = inStream.readUTF();
                tuple2 = new Tuple2((Object)StructType$.MODULE$.fromString(keySchemaStr), (Object)StructType$.MODULE$.fromString(valueSchemaStr));
            }
            catch (Throwable e) {
                this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Fail to read schema file from ").append(this.schemaFileLocation()).toString(), e);
                throw e;
            }
        }
        return tuple2;
    }

    private void createSchemaFile(StructType keySchema, StructType valueSchema) {
        CheckpointFileManager.CancellableFSDataOutputStream outStream = this.fm().createAtomic(this.schemaFileLocation(), false);
        try {
            outStream.writeUTF(new StringBuilder(1).append("v").append(StateSchemaCompatibilityChecker$.MODULE$.VERSION()).toString());
            outStream.writeUTF(keySchema.json());
            outStream.writeUTF(valueSchema.json());
            outStream.close();
        }
        catch (Throwable e) {
            this.logError((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(29).append("Fail to write schema file to ").append(this.schemaFileLocation()).toString(), e);
            outStream.cancel();
            throw e;
        }
    }

    private Path schemaFile(Path storeCpLocation) {
        return new Path(new Path(storeCpLocation, "_metadata"), "schema");
    }

    public StateSchemaCompatibilityChecker(StateStoreProviderId providerId, Configuration hadoopConf) {
        this.providerId = providerId;
        Logging.$init$((Logging)this);
        this.storeCpLocation = providerId.storeId().storeCheckpointLocation();
        this.fm = CheckpointFileManager$.MODULE$.create(this.storeCpLocation(), hadoopConf);
        this.schemaFileLocation = this.schemaFile(this.storeCpLocation());
        this.fm().mkdirs(this.schemaFileLocation().getParent());
    }
}

