/*
 * Decompiled with CFR 0.152.
 */
package org.frameworkset.plugin.kafka;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.frameworkset.plugin.kafka.BaseKafkaConsumer;
import org.frameworkset.plugin.kafka.ShutdownException;
import org.frameworkset.plugin.kafka.StoreService;
import org.frameworkset.spi.BaseApplicationContext;
import org.frameworkset.util.concurrent.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseKafkaConsumerThread
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerThread.class);
    protected StoreService storeService;
    protected String name;
    protected boolean shutdown;
    protected BaseKafkaConsumer consumer;
    private KafkaConsumer kafkaConsumer;
    protected String[] topics;
    protected long pollTimeout;
    protected String keyDeserializer;
    private String groupId;
    protected String valueDeserializer;
    protected Integer maxPollRecords;
    protected Integer workThreads;
    protected Integer workQueue = 100;
    protected ExecutorService executor;
    protected Boolean batch = true;
    protected int partition;
    protected String discardRejectMessage;
    protected long blockedWaitTimeout = -1L;
    protected int warnMultsRejects = 500;

    public long getBlockedWaitTimeout() {
        return this.blockedWaitTimeout;
    }

    public void setBlockedWaitTimeout(long blockedWaitTimeout) {
        this.blockedWaitTimeout = blockedWaitTimeout;
    }

    public int getWarnMultsRejects() {
        return this.warnMultsRejects;
    }

    public void setWarnMultsRejects(int warnMultsRejects) {
        this.warnMultsRejects = warnMultsRejects;
    }

    public String getDiscardRejectMessage() {
        return this.discardRejectMessage;
    }

    public void setDiscardRejectMessage(String discardRejectMessage) {
        this.discardRejectMessage = discardRejectMessage;
    }

    public BaseKafkaConsumerThread(int partition, BaseKafkaConsumer consumer, String[] topics, StoreService storeService) {
        this.storeService = storeService;
        this.partition = partition;
        this.name = "KafkaBatchConsumer-" + BaseKafkaConsumerThread.topicsStr(topics) + "-p" + partition;
        this.consumer = consumer;
        this.topics = topics;
        BaseApplicationContext.addShutdownHook((Runnable)new Runnable(){

            @Override
            public void run() {
                BaseKafkaConsumerThread.this.shutdown();
            }
        });
    }

    private static String topicsStr(String[] topics) {
        StringBuilder builder = new StringBuilder();
        boolean s = false;
        for (String topic : topics) {
            if (!s) {
                builder.append(topic);
                s = true;
                continue;
            }
            builder.append(",").append(topic);
        }
        return builder.toString();
    }

    public void setPollTimeout(long pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }

    public void setWorkQueue(Integer workQueue) {
        this.workQueue = workQueue;
    }

    public void setWorkThreads(Integer workThreads) {
        this.workThreads = workThreads;
    }

    public Integer getWorkQueue() {
        return this.workQueue;
    }

    public Integer getWorkThreads() {
        return this.workThreads;
    }

    public void setBatch(Boolean batch) {
        this.batch = batch;
    }

    public Boolean getBatch() {
        return this.batch;
    }

    public void setMaxPollRecords(Integer maxPollRecords) {
        this.maxPollRecords = maxPollRecords;
    }

    public Integer getMaxPollRecords() {
        return this.maxPollRecords;
    }

    public void setValueDeserializer(String valueDeserializer) {
        this.valueDeserializer = valueDeserializer;
    }

    public String getValueDeserializer() {
        return this.valueDeserializer;
    }

    public void setKeyDeserializer(String keyDeserializer) {
        this.keyDeserializer = keyDeserializer;
    }

    public String getKeyDeserializer() {
        return this.keyDeserializer;
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        if (this.executor != null) {
            try {
                this.executor.shutdown();
            }
            catch (Exception e) {
                logger.warn("", (Throwable)e);
            }
        }
        try {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            logger.warn("", (Throwable)e);
        }
    }

    private void buildConsumerAndSubscribe() {
        Properties properties = this.consumer.getConsumerPropes();
        Properties threadProperties = new Properties();
        threadProperties.putAll((Map<?, ?>)properties);
        if (this.keyDeserializer != null && !this.keyDeserializer.equals("")) {
            threadProperties.put("key.deserializer", this.keyDeserializer);
        }
        if (this.valueDeserializer != null && !this.valueDeserializer.equals("")) {
            threadProperties.put("value.deserializer", this.valueDeserializer);
        }
        if (this.maxPollRecords != null) {
            threadProperties.put("max.poll.records", this.maxPollRecords + "");
        }
        if (this.groupId != null && !this.groupId.equals("")) {
            threadProperties.put("group.id", this.groupId);
        }
        if (this.workThreads != null) {
            this.executor = ThreadPoolFactory.buildThreadPool((String)this.name, (String)(this.discardRejectMessage == null ? "Kafka consumer message handle" : this.discardRejectMessage), (int)this.workThreads, (int)this.workQueue, (long)this.blockedWaitTimeout, (int)this.warnMultsRejects, (boolean)true, (Boolean)true);
        }
        this.kafkaConsumer = new KafkaConsumer(threadProperties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topics));
    }

    @Override
    public void run() {
        block3: {
            try {
                this.buildConsumerAndSubscribe();
                while (!this.shutdown) {
                    ConsumerRecords records = this.kafkaConsumer.poll(this.pollTimeout);
                    if (records == null || records.isEmpty()) continue;
                    this.handleDatas(this.executor, this.kafkaConsumer, this.consumer, (ConsumerRecords<Object, Object>)records);
                }
            }
            catch (Throwable e) {
                if (!logger.isErrorEnabled()) break block3;
                logger.error("", e);
            }
        }
    }

    private void doHandle(ConsumerRecords<Object, Object> records) {
        try {
            if (this.batch != null && this.batch.booleanValue()) {
                this.storeService.store(records);
            } else {
                for (ConsumerRecord consumerRecord : records) {
                    this.storeService.store((ConsumerRecord<Object, Object>)consumerRecord);
                }
            }
        }
        catch (ShutdownException e) {
            throw e;
        }
        catch (Exception e) {
            logger.error("", (Throwable)e);
        }
        catch (Throwable e) {
            logger.error("", e);
        }
    }

    protected void handleDatas(ExecutorService executor, KafkaConsumer kafkaConsumer, BaseKafkaConsumer consumer, final ConsumerRecords<Object, Object> records) {
        if (executor != null) {
            executor.submit(new Runnable(){

                @Override
                public void run() {
                    BaseKafkaConsumerThread.this.doHandle((ConsumerRecords<Object, Object>)records);
                }
            });
        } else {
            this.doHandle(records);
        }
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
}

