package org.frameworkset.plugin.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.frameworkset.spi.BaseApplicationContext;
import org.frameworkset.util.concurrent.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/frameworkset/plugin/kafka/BaseKafkaConsumerThread.class */
public class BaseKafkaConsumerThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(BaseKafkaConsumerThread.class);
    protected StoreService storeService;
    protected String name;
    protected boolean shutdown;
    protected BaseKafkaConsumer consumer;
    private KafkaConsumer kafkaConsumer;
    protected String[] topics;
    protected long pollTimeout;
    protected String keyDeserializer;
    private String groupId;
    protected String valueDeserializer;
    protected Integer maxPollRecords;
    protected Integer workThreads;
    protected ExecutorService executor;
    protected int partition;
    protected String discardRejectMessage;
    protected Integer workQueue = 100;
    protected Boolean batch = true;
    protected long blockedWaitTimeout = -1;
    protected int warnMultsRejects = 500;

    public long getBlockedWaitTimeout() {
        return this.blockedWaitTimeout;
    }

    public void setBlockedWaitTimeout(long j) {
        this.blockedWaitTimeout = j;
    }

    public int getWarnMultsRejects() {
        return this.warnMultsRejects;
    }

    public void setWarnMultsRejects(int i) {
        this.warnMultsRejects = i;
    }

    public String getDiscardRejectMessage() {
        return this.discardRejectMessage;
    }

    public void setDiscardRejectMessage(String str) {
        this.discardRejectMessage = str;
    }

    public BaseKafkaConsumerThread(int i, BaseKafkaConsumer baseKafkaConsumer, String[] strArr, StoreService storeService) {
        this.storeService = storeService;
        this.partition = i;
        this.name = "KafkaBatchConsumer-" + topicsStr(strArr) + "-p" + i;
        this.consumer = baseKafkaConsumer;
        this.topics = strArr;
        BaseApplicationContext.addShutdownHook(new Runnable() { // from class: org.frameworkset.plugin.kafka.BaseKafkaConsumerThread.1
            @Override // java.lang.Runnable
            public void run() {
                BaseKafkaConsumerThread.this.shutdown();
            }
        });
    }

    private static String topicsStr(String[] strArr) {
        StringBuilder sb = new StringBuilder();
        boolean z = false;
        for (String str : strArr) {
            if (z) {
                sb.append(",").append(str);
            } else {
                sb.append(str);
                z = true;
            }
        }
        return sb.toString();
    }

    public void setPollTimeout(long j) {
        this.pollTimeout = j;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }

    public void setWorkQueue(Integer num) {
        this.workQueue = num;
    }

    public void setWorkThreads(Integer num) {
        this.workThreads = num;
    }

    public Integer getWorkQueue() {
        return this.workQueue;
    }

    public Integer getWorkThreads() {
        return this.workThreads;
    }

    public void setBatch(Boolean bool) {
        this.batch = bool;
    }

    public Boolean getBatch() {
        return this.batch;
    }

    public void setMaxPollRecords(Integer num) {
        this.maxPollRecords = num;
    }

    public Integer getMaxPollRecords() {
        return this.maxPollRecords;
    }

    public void setValueDeserializer(String str) {
        this.valueDeserializer = str;
    }

    public String getValueDeserializer() {
        return this.valueDeserializer;
    }

    public void setKeyDeserializer(String str) {
        this.keyDeserializer = str;
    }

    public String getKeyDeserializer() {
        return this.keyDeserializer;
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        if (this.executor != null) {
            try {
                this.executor.shutdown();
            } catch (Exception e) {
                logger.warn("", e);
            }
        }
        try {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            logger.warn("", e2);
        }
    }

    private void buildConsumerAndSubscribe() {
        Properties consumerPropes = this.consumer.getConsumerPropes();
        Properties properties = new Properties();
        properties.putAll(consumerPropes);
        if (this.keyDeserializer != null && !this.keyDeserializer.equals("")) {
            properties.put("key.deserializer", this.keyDeserializer);
        }
        if (this.valueDeserializer != null && !this.valueDeserializer.equals("")) {
            properties.put("value.deserializer", this.valueDeserializer);
        }
        if (this.maxPollRecords != null) {
            properties.put("max.poll.records", this.maxPollRecords + "");
        }
        if (this.groupId != null && !this.groupId.equals("")) {
            properties.put("group.id", this.groupId);
        }
        if (this.workThreads != null) {
            this.executor = ThreadPoolFactory.buildThreadPool(this.name, this.discardRejectMessage == null ? "Kafka consumer message handle" : this.discardRejectMessage, this.workThreads.intValue(), this.workQueue.intValue(), this.blockedWaitTimeout, this.warnMultsRejects, true, true);
        }
        this.kafkaConsumer = new KafkaConsumer(properties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topics));
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            buildConsumerAndSubscribe();
            while (!this.shutdown) {
                ConsumerRecords<Object, Object> poll = this.kafkaConsumer.poll(this.pollTimeout);
                if (poll != null && !poll.isEmpty()) {
                    handleDatas(this.executor, this.kafkaConsumer, this.consumer, poll);
                }
            }
        } catch (Throwable th) {
            if (logger.isErrorEnabled()) {
                logger.error("", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doHandle(ConsumerRecords<Object, Object> consumerRecords) {
        try {
            if (this.batch == null || !this.batch.booleanValue()) {
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    this.storeService.store((ConsumerRecord<Object, Object>) it.next());
                }
            } else {
                this.storeService.store(consumerRecords);
            }
        } catch (ShutdownException e) {
            throw e;
        } catch (Exception e2) {
            logger.error("", e2);
        } catch (Throwable th) {
            logger.error("", th);
        }
    }

    protected void handleDatas(ExecutorService executorService, KafkaConsumer kafkaConsumer, BaseKafkaConsumer baseKafkaConsumer, final ConsumerRecords<Object, Object> consumerRecords) {
        if (executorService != null) {
            executorService.submit(new Runnable() { // from class: org.frameworkset.plugin.kafka.BaseKafkaConsumerThread.2
                @Override // java.lang.Runnable
                public void run() {
                    BaseKafkaConsumerThread.this.doHandle(consumerRecords);
                }
            });
        } else {
            doHandle(consumerRecords);
        }
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }
}
