/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.redis;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisSink<IN>
extends RichSinkFunction<IN> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class);
    private String additionalKey;
    private Integer additionalTTL;
    private RedisMapper<IN> redisSinkMapper;
    private RedisCommand redisCommand;
    private FlinkJedisConfigBase flinkJedisConfigBase;
    private RedisCommandsContainer redisCommandsContainer;

    public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
        Objects.requireNonNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
        Objects.requireNonNull(redisSinkMapper, "Redis Mapper can not be null");
        Objects.requireNonNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");
        this.flinkJedisConfigBase = flinkJedisConfigBase;
        this.redisSinkMapper = redisSinkMapper;
        RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
        this.redisCommand = redisCommandDescription.getCommand();
        this.additionalTTL = redisCommandDescription.getAdditionalTTL();
        this.additionalKey = redisCommandDescription.getAdditionalKey();
    }

    public void invoke(IN input, SinkFunction.Context context) throws Exception {
        String key = this.redisSinkMapper.getKeyFromData(input);
        String value = this.redisSinkMapper.getValueFromData(input);
        Optional<String> optAdditionalKey = this.redisSinkMapper.getAdditionalKey(input);
        Optional<Integer> optAdditionalTTL = this.redisSinkMapper.getAdditionalTTL(input);
        switch (this.redisCommand) {
            case RPUSH: {
                this.redisCommandsContainer.rpush(key, value);
                break;
            }
            case LPUSH: {
                this.redisCommandsContainer.lpush(key, value);
                break;
            }
            case SADD: {
                this.redisCommandsContainer.sadd(key, value);
                break;
            }
            case SET: {
                this.redisCommandsContainer.set(key, value);
                break;
            }
            case SETEX: {
                this.redisCommandsContainer.setex(key, value, optAdditionalTTL.orElse(this.additionalTTL));
                break;
            }
            case PFADD: {
                this.redisCommandsContainer.pfadd(key, value);
                break;
            }
            case PUBLISH: {
                this.redisCommandsContainer.publish(key, value);
                break;
            }
            case ZADD: {
                this.redisCommandsContainer.zadd(optAdditionalKey.orElse(this.additionalKey), value, key);
                break;
            }
            case ZINCRBY: {
                this.redisCommandsContainer.zincrBy(optAdditionalKey.orElse(this.additionalKey), value, key);
                break;
            }
            case ZREM: {
                this.redisCommandsContainer.zrem(optAdditionalKey.orElse(this.additionalKey), key);
                break;
            }
            case HSET: {
                this.redisCommandsContainer.hset(optAdditionalKey.orElse(this.additionalKey), key, value, optAdditionalTTL.orElse(this.additionalTTL));
                break;
            }
            case HINCRBY: {
                this.redisCommandsContainer.hincrBy(optAdditionalKey.orElse(this.additionalKey), key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            }
            case INCRBY: {
                this.redisCommandsContainer.incrBy(key, Long.valueOf(value));
                break;
            }
            case INCRBY_EX: {
                this.redisCommandsContainer.incrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            }
            case DECRBY: {
                this.redisCommandsContainer.decrBy(key, Long.valueOf(value));
                break;
            }
            case DESCRBY_EX: {
                this.redisCommandsContainer.decrByEx(key, Long.valueOf(value), optAdditionalTTL.orElse(this.additionalTTL));
                break;
            }
            default: {
                throw new IllegalArgumentException("Cannot process such data type: " + (Object)((Object)this.redisCommand));
            }
        }
    }

    public void open(Configuration parameters) throws Exception {
        try {
            this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
            this.redisCommandsContainer.open();
        }
        catch (Exception e) {
            LOG.error("Redis has not been properly initialized: ", (Throwable)e);
            throw e;
        }
    }

    public void close() throws IOException {
        if (this.redisCommandsContainer != null) {
            this.redisCommandsContainer.close();
        }
    }
}

