/*
 * Decompiled with CFR 0.152.
 */
package com.bxm.warcar.mq.kafka;

import com.bxm.warcar.mq.Message;
import com.bxm.warcar.mq.Producer;
import com.bxm.warcar.mq.SendException;
import com.bxm.warcar.mq.SendResult;
import com.bxm.warcar.mq.kafka.KafkaMsgIdUtils;
import com.bxm.warcar.utils.LifeCycle;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaProducer
extends LifeCycle
implements Producer {
    private org.apache.kafka.clients.producer.Producer<String, byte[]> producer;

    public KafkaProducer(Properties properties) {
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);
    }

    @Override
    protected void doInit() {
    }

    @Override
    protected void doDestroy() {
        this.close();
    }

    @Override
    public SendResult send(Message message) throws SendException {
        String topic = message.getTopic();
        String key = message.getKey();
        byte[] data = message.getBody();
        try {
            Future future = this.producer.send(new ProducerRecord(topic, (Object)key, (Object)data));
            RecordMetadata metadata = (RecordMetadata)future.get();
            String msgId = KafkaMsgIdUtils.generate(topic, metadata.partition(), metadata.offset());
            return new SendResult(msgId);
        }
        catch (Exception e) {
            throw new SendException(e);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void close() {
        if (null != this.producer) {
            this.producer.close();
        }
    }

    @Override
    public boolean isStarted() {
        return true;
    }
}

