/*
 * Decompiled with CFR 0.152.
 */
package org.frameworkset.plugin.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.frameworkset.plugin.kafka.BaseKafkaConsumerThread;
import org.frameworkset.plugin.kafka.KafkaListener;
import org.frameworkset.plugin.kafka.StoreService;
import org.frameworkset.spi.BaseApplicationContext;
import org.frameworkset.spi.support.ApplicationObjectSupport;

public class BaseKafkaConsumer
extends ApplicationObjectSupport
implements KafkaListener {
    protected List<BaseKafkaConsumerThread> baseKafkaConsumerThreadList = new ArrayList<BaseKafkaConsumerThread>();
    protected String topic;
    protected Properties consumerPropes;
    private boolean autoCommit;
    protected long pollTimeout = 1000L;
    protected int threads = 4;
    protected Boolean batch = true;
    protected ExecutorService executor;
    protected String keyDeserializer;
    protected String valueDeserializer;
    protected Integer maxPollRecords;
    protected Integer workThreads;
    protected long blockedWaitTimeout = -1L;
    protected int warnMultsRejects = 500;
    protected Integer workQueue = 100;
    private String groupId;
    protected String discardRejectMessage;
    protected StoreService storeService = null;

    public Properties getConsumerPropes() {
        return this.consumerPropes;
    }

    public long getBlockedWaitTimeout() {
        return this.blockedWaitTimeout;
    }

    public void setBlockedWaitTimeout(long blockedWaitTimeout) {
        this.blockedWaitTimeout = blockedWaitTimeout;
    }

    public int getWarnMultsRejects() {
        return this.warnMultsRejects;
    }

    public void setWarnMultsRejects(int warnMultsRejects) {
        this.warnMultsRejects = warnMultsRejects;
    }

    public void setThreads(int threads) {
        this.threads = threads;
    }

    public String getDiscardRejectMessage() {
        return this.discardRejectMessage;
    }

    public void setDiscardRejectMessage(String discardRejectMessage) {
        this.discardRejectMessage = discardRejectMessage;
    }

    public int getThreads() {
        return this.threads;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setBatch(Boolean batch) {
        this.batch = batch;
    }

    public Boolean getBatch() {
        return this.batch;
    }

    public void setWorkQueue(Integer workQueue) {
        this.workQueue = workQueue;
    }

    public void setWorkThreads(Integer workThreads) {
        this.workThreads = workThreads;
    }

    public Integer getWorkQueue() {
        return this.workQueue;
    }

    public Integer getWorkThreads() {
        return this.workThreads;
    }

    public void setKeyDeserializer(String keyDeserializer) {
        this.keyDeserializer = keyDeserializer;
    }

    public String getKeyDeserializer() {
        return this.keyDeserializer;
    }

    public void setValueDeserializer(String valueDeserializer) {
        this.valueDeserializer = valueDeserializer;
    }

    public String getValueDeserializer() {
        return this.valueDeserializer;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }

    public void setMaxPollRecords(Integer maxPollRecords) {
        this.maxPollRecords = maxPollRecords;
    }

    public Integer getMaxPollRecords() {
        return this.maxPollRecords;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setPollTimeout(long pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    public void setConsumerPropes(Properties consumerPropes) {
        this.consumerPropes = consumerPropes;
    }

    public void shutdown() {
        if (this.baseKafkaConsumerThreadList.size() > 0) {
            for (BaseKafkaConsumerThread baseKafkaConsumerThread : this.baseKafkaConsumerThreadList) {
                baseKafkaConsumerThread.shutdown();
            }
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    public void init() {
        String _autoCommit = this.consumerPropes.getProperty("enable.auto.commit", "true");
        this.autoCommit = _autoCommit.equals("true");
    }

    @Override
    public void run() {
        HashMap topicCountMap = new HashMap();
        String[] topics = this.topic.split("\\,");
        int a_numThreads = this.threads;
        this.executor = Executors.newFixedThreadPool(a_numThreads, new ThreadFactory(){
            private int i = 0;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "BaseKafkaConsumer-" + this.i++);
            }
        });
        for (int i = 0; i < a_numThreads; ++i) {
            BaseKafkaConsumerThread runnable = this.buildRunnable(i, topics);
            this.baseKafkaConsumerThreadList.add(runnable);
            this.executor.submit(runnable);
        }
        BaseApplicationContext.addShutdownHook((Runnable)new Runnable(){

            @Override
            public void run() {
                BaseKafkaConsumer.this.shutdown();
            }
        });
    }

    protected BaseKafkaConsumerThread buildRunnable(int partition, String[] topic) {
        BaseKafkaConsumerThread baseKafkaConsumerThread = new BaseKafkaConsumerThread(partition, this, topic, this.storeService);
        baseKafkaConsumerThread.setKeyDeserializer(this.keyDeserializer);
        baseKafkaConsumerThread.setValueDeserializer(this.valueDeserializer);
        baseKafkaConsumerThread.setMaxPollRecords(this.maxPollRecords);
        baseKafkaConsumerThread.setPollTimeout(this.pollTimeout);
        baseKafkaConsumerThread.setWorkThreads(this.workThreads);
        baseKafkaConsumerThread.setWorkQueue(this.workQueue);
        baseKafkaConsumerThread.setBatch(this.batch);
        baseKafkaConsumerThread.setBlockedWaitTimeout(this.blockedWaitTimeout);
        baseKafkaConsumerThread.setDiscardRejectMessage(this.discardRejectMessage);
        baseKafkaConsumerThread.setWarnMultsRejects(this.warnMultsRejects);
        baseKafkaConsumerThread.setGroupId(this.groupId);
        return baseKafkaConsumerThread;
    }

    public boolean isAutoCommit() {
        return this.autoCommit;
    }

    public void setAutoCommit(boolean autoCommit) {
        this.autoCommit = autoCommit;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }
}

