package org.apache.spark.streaming.aliyun.datahub;

import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.model.GetCursorRequest;
import com.aliyun.datahub.model.GetRecordsResult;
import com.aliyun.datahub.model.GetTopicResult;
import com.aliyun.datahub.model.OffsetContext;
import com.aliyun.datahub.model.RecordEntry;
import java.util.List;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/* compiled from: DatahubConsumer.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ma\u0001B\u0001\u0003\u0001=\u0011q\u0002R1uC\",(mQ8ogVlWM\u001d\u0006\u0003\u0007\u0011\tq\u0001Z1uC\",(M\u0003\u0002\u0006\r\u00051\u0011\r\\5zk:T!a\u0002\u0005\u0002\u0013M$(/Z1nS:<'BA\u0005\u000b\u0003\u0015\u0019\b/\u0019:l\u0015\tYA\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001b\u0005\u0019qN]4\u0004\u0001U\u0011\u0001CR\n\u0005\u0001EIB\u0004\u0005\u0002\u0013/5\t1C\u0003\u0002\u0015+\u0005!A.\u00198h\u0015\u00051\u0012\u0001\u00026bm\u0006L!\u0001G\n\u0003\r=\u0013'.Z2u!\t\u0011\"$\u0003\u0002\u001c'\tA!+\u001e8oC\ndW\r\u0005\u0002\u001eA5\taD\u0003\u0002 \u0011\u0005A\u0011N\u001c;fe:\fG.\u0003\u0002\"=\t9Aj\\4hS:<\u0007\u0002C\u0012\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0013\u0002\u0017A\u0014xN[3di:\u000bW.\u001a\t\u0003K-r!AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\na\u0001\u0015:fI\u00164\u0017B\u0001\u0017.\u0005\u0019\u0019FO]5oO*\u0011!f\n\u0005\t_\u0001\u0011\t\u0011)A\u0005I\u0005IAo\u001c9jG:\u000bW.\u001a\u0005\tc\u0001\u0011\t\u0011)A\u0005I\u000591\u000f[1sI&#\u0007\u0002C\u001a\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0013\u0002\u000bM,(-\u00133\t\u0011U\u0002!\u0011!Q\u0001\nY\nAaY8oMB\u0011q\u0007P\u0007\u0002q)\u00111!\u000f\u0006\u0003\u000biR\u0011aO\u0001\u0004G>l\u0017BA\u001f9\u0005Q!\u0015\r^1ik\n\u001cuN\u001c4jOV\u0014\u0018\r^5p]\"Aq\b\u0001B\u0001B\u0003%\u0001)\u0001\u0005sK\u000e,\u0017N^3s!\r\t%\tR\u0007\u0002\u0005%\u00111I\u0001\u0002\u0010\t\u0006$\u0018\r[;c%\u0016\u001cW-\u001b<feB\u0011QI\u0012\u0007\u0001\t\u00159\u0005A1\u0001I\u0005\u0005!\u0016CA%M!\t1#*\u0003\u0002LO\t9aj\u001c;iS:<\u0007C\u0001\u0014N\u0013\tquEA\u0002B]fD\u0001\u0002\u0015\u0001\u0003\u0002\u0003\u0006I!U\u0001\u0005MVt7\r\u0005\u0003'%R#\u0015BA*(\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0002V16\taK\u0003\u0002Xq\u0005)Qn\u001c3fY&\u0011\u0011L\u0016\u0002\f%\u0016\u001cwN\u001d3F]R\u0014\u0018\u0010\u0003\u0005\\\u0001\t\u0005\t\u0015!\u0003]\u0003A\u0019\b/\u0019:l\t\u0006$\u0018\r[;c\u0007>tg\r\u0005\u0003&;\u0012\"\u0013B\u00010.\u0005\ri\u0015\r\u001d\u0005\u0006A\u0002!\t!Y\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0013\t\u001cG-\u001a4hQ&T\u0007cA!\u0001\t\")1e\u0018a\u0001I!)qf\u0018a\u0001I!)\u0011g\u0018a\u0001I!)1g\u0018a\u0001I!)Qg\u0018a\u0001m!)qh\u0018a\u0001\u0001\")\u0001k\u0018a\u0001#\")1l\u0018a\u00019\"9A\u000e\u0001b\u0001\n\u0013i\u0017aD4fiJ+7m\u001c:e\u0019&l\u0017\u000e^:\u0016\u00039\u0004\"AJ8\n\u0005A<#aA%oi\"1!\u000f\u0001Q\u0001\n9\f\u0001cZ3u%\u0016\u001cwN\u001d3MS6LGo\u001d\u0011\t\u000fQ\u0004!\u0019!C\u0005k\u0006\u0011r-\u001a;SK\u000e|'\u000fZ*mK\u0016\u0004H+[7f+\u00051\bC\u0001\u0014x\u0013\tAxE\u0001\u0003M_:<\u0007B\u0002>\u0001A\u0003%a/A\nhKR\u0014VmY8sINcW-\u001a9US6,\u0007\u0005C\u0004}\u0001\t\u0007I\u0011B7\u0002\u0015\r|W.\\5u%><8\u000f\u0003\u0004\u007f\u0001\u0001\u0006IA\\\u0001\fG>lW.\u001b;S_^\u001c\b\u0005C\u0005\u0002\u0002\u0001\u0011\r\u0011\"\u0001\u0002\u0004\u000511\r\\5f]R,\"!!\u0002\u0011\u0007\u0005\u000b9!C\u0002\u0002\n\t\u0011\u0001\u0003R1uC\",(m\u00117jK:$x\n\u001d;\t\u0011\u00055\u0001\u0001)A\u0005\u0003\u000b\tqa\u00197jK:$\b\u0005C\u0004\u0002\u0012\u0001!\t%a\u0005\u0002\u0007I,h\u000e\u0006\u0002\u0002\u0016A\u0019a%a\u0006\n\u0007\u0005eqE\u0001\u0003V]&$\b")
/* loaded from: input_file:org/apache/spark/streaming/aliyun/datahub/DatahubConsumer.class */
public class DatahubConsumer<T> implements Runnable, Logging {
    private final String projectName;
    private final String topicName;
    public final String org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId;
    private final String subId;
    public final DatahubReceiver<T> org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$receiver;
    public final Function1<RecordEntry, T> org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$func;
    private final int org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordLimits;
    private final long org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordSleepTime;
    private final int org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$commitRows;
    private final DatahubClientOpt client;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    public int org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordLimits() {
        return this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordLimits;
    }

    public long org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordSleepTime() {
        return this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordSleepTime;
    }

    public int org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$commitRows() {
        return this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$commitRows;
    }

    public DatahubClientOpt client() {
        return this.client;
    }

    @Override // java.lang.Runnable
    public void run() {
        LongRef create = LongRef.create(0L);
        GetTopicResult topic = client().getTopic(this.projectName, this.topicName);
        OffsetContext initOffsetContext = client().initOffsetContext(this.projectName, this.topicName, this.subId, this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId);
        ObjectRef create2 = ObjectRef.create((Object) null);
        if (initOffsetContext.hasOffset()) {
            try {
                create2.elem = client().getNextOffsetCursor(initOffsetContext).getCursor();
            } catch (DatahubClientException e) {
                create2.elem = client().getCursor(this.projectName, this.topicName, this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId, GetCursorRequest.CursorType.OLDEST).getCursor();
            }
        } else {
            create2.elem = client().getCursor(this.projectName, this.topicName, this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId, GetCursorRequest.CursorType.OLDEST).getCursor();
        }
        logInfo(new DatahubConsumer$$anonfun$run$1(this, initOffsetContext, create2));
        logInfo(new DatahubConsumer$$anonfun$run$2(this));
        while (true) {
            try {
                GetRecordsResult records = client().getRecords(this.projectName, this.topicName, this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId, (String) create2.elem, org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordLimits(), topic.getRecordSchema());
                List records2 = records.getRecords();
                if (records2.size() == 0) {
                    Thread.sleep(org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordSleepTime());
                } else {
                    JavaConversions$.MODULE$.asScalaBuffer(records2).foreach(new DatahubConsumer$$anonfun$run$3(this, create, initOffsetContext));
                    create2.elem = records.getNextCursor();
                }
            } catch (Exception e2) {
                throw e2;
            } catch (OffsetResetedException unused) {
                client().updateOffsetContext(initOffsetContext);
                create2.elem = client().getNextOffsetCursor(initOffsetContext).getCursor();
                logInfo(new DatahubConsumer$$anonfun$run$4(this, initOffsetContext, create2));
            }
        }
    }

    public DatahubConsumer(String str, String str2, String str3, String str4, DatahubConfiguration datahubConfiguration, DatahubReceiver<T> datahubReceiver, Function1<RecordEntry, T> function1, Map<String, String> map) {
        this.projectName = str;
        this.topicName = str2;
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$shardId = str3;
        this.subId = str4;
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$receiver = datahubReceiver;
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$func = function1;
        Logging.class.$init$(this);
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordLimits = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.batch.getrecord.limits", new DatahubConsumer$$anonfun$1(this)).toString())).toInt();
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$getRecordSleepTime = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.norecord.waittimes", new DatahubConsumer$$anonfun$2(this)).toString())).toLong();
        this.org$apache$spark$streaming$aliyun$datahub$DatahubConsumer$$commitRows = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.commit.perrows", new DatahubConsumer$$anonfun$3(this)).toString())).toInt();
        this.client = new DatahubClientOpt(datahubConfiguration);
    }
}
