package org.apache.spark.streaming.aliyun.datahub;

import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.model.GetCursorRequest;
import com.aliyun.datahub.model.GetRecordsResult;
import com.aliyun.datahub.model.GetTopicResult;
import com.aliyun.datahub.model.OffsetContext;
import com.aliyun.datahub.model.RecordEntry;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.spark.SparkException;
import org.apache.spark.internal.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.receiver.BlockGenerator;
import org.apache.spark.streaming.receiver.BlockGeneratorListener;
import org.apache.spark.streaming.receiver.Receiver;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Option$;
import scala.Predef$;
import scala.collection.JavaConversions$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.LongRef;

/* compiled from: ReliableDatahubReceiver.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u0005h!B\u0001\u0003\u0001\tq!a\u0006*fY&\f'\r\\3ECR\f\u0007.\u001e2SK\u000e,\u0017N^3s\u0015\t\u0019A!A\u0004eCR\f\u0007.\u001e2\u000b\u0005\u00151\u0011AB1mSf,hN\u0003\u0002\b\u0011\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u0013)\tQa\u001d9be.T!a\u0003\u0007\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005i\u0011aA8sOV\u0011q\u0002G\n\u0004\u0001A)\u0003cA\t\u0015-5\t!C\u0003\u0002\u0014\r\u0005A!/Z2fSZ,'/\u0003\u0002\u0016%\tA!+Z2fSZ,'\u000f\u0005\u0002\u001811\u0001A!B\r\u0001\u0005\u0004Y\"!\u0001+\u0004\u0001E\u0011AD\t\t\u0003;\u0001j\u0011A\b\u0006\u0002?\u0005)1oY1mC&\u0011\u0011E\b\u0002\b\u001d>$\b.\u001b8h!\ti2%\u0003\u0002%=\t\u0019\u0011I\\=\u0011\u0005\u0019JS\"A\u0014\u000b\u0005!B\u0011\u0001C5oi\u0016\u0014h.\u00197\n\u0005):#a\u0002'pO\u001eLgn\u001a\u0005\tY\u0001\u0011\t\u0011)A\u0005[\u0005Y\u0001O]8kK\u000e$h*Y7f!\tq\u0013G\u0004\u0002\u001e_%\u0011\u0001GH\u0001\u0007!J,G-\u001a4\n\u0005I\u001a$AB*ue&twM\u0003\u00021=!AQ\u0007\u0001B\u0001B\u0003%Q&A\u0005u_BL7MT1nK\"Aq\u0007\u0001B\u0001B\u0003%Q&A\u0003tk\nLE\r\u0003\u0005:\u0001\t\u0005\t\u0015!\u0003.\u0003-\t7mY3tg.+\u00170\u00133\t\u0011m\u0002!\u0011!Q\u0001\n5\nq\"Y2dKN\u001c8*Z=TK\u000e\u0014X\r\u001e\u0005\t{\u0001\u0011\t\u0011)A\u0005[\u0005AQM\u001c3q_&tG\u000f\u0003\u0005@\u0001\t\u0005\t\u0015!\u0003.\u0003\u001d\u0019\b.\u0019:e\u0013\u0012D\u0001\"\u0011\u0001\u0003\u0002\u0003\u0006IAQ\u0001\u0005MVt7\r\u0005\u0003\u001e\u0007\u00163\u0012B\u0001#\u001f\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0002G\u001b6\tqI\u0003\u0002I\u0013\u0006)Qn\u001c3fY*\u00111A\u0013\u0006\u0003\u000b-S\u0011\u0001T\u0001\u0004G>l\u0017B\u0001(H\u0005-\u0011VmY8sI\u0016sGO]=\t\u0013A\u0003!\u0011!Q\u0001\nE;\u0016\u0001D:u_J\fw-\u001a'fm\u0016d\u0007C\u0001*V\u001b\u0005\u0019&B\u0001+\t\u0003\u001d\u0019Ho\u001c:bO\u0016L!AV*\u0003\u0019M#xN]1hK2+g/\u001a7\n\u0005A#\u0002\u0002C-\u0001\u0005\u0003\u0005\u000b\u0011\u0002.\u0002!M\u0004\u0018M]6ECR\f\u0007.\u001e2D_:4\u0007\u0003\u0002\u0018\\[5J!\u0001X\u001a\u0003\u00075\u000b\u0007\u000f\u0003\u0005_\u0001\t\r\t\u0015a\u0003`\u0003))g/\u001b3f]\u000e,G%\r\t\u0004A\u000e4R\"A1\u000b\u0005\tt\u0012a\u0002:fM2,7\r^\u0005\u0003I\u0006\u0014\u0001b\u00117bgN$\u0016m\u001a\u0005\u0006M\u0002!\taZ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0017!dWN\\8qcJ\u001cH/\u001e\u000b\u0003S.\u00042A\u001b\u0001\u0017\u001b\u0005\u0011\u0001\"\u00020f\u0001\by\u0006\"\u0002\u0017f\u0001\u0004i\u0003\"B\u001bf\u0001\u0004i\u0003\"B\u001cf\u0001\u0004i\u0003\"B\u001df\u0001\u0004i\u0003\"B\u001ef\u0001\u0004i\u0003\"B\u001ff\u0001\u0004i\u0003\"B f\u0001\u0004i\u0003\"B!f\u0001\u0004\u0011\u0005\"\u0002)f\u0001\u0004\t\u0006\"B-f\u0001\u0004Q\u0006bB<\u0001\u0001\u0004%I\u0001_\u0001\r_\u001a47/\u001a;Ck\u001a4WM]\u000b\u0002sB!!p`A\u0002\u001b\u0005Y(B\u0001?~\u0003\u001diW\u000f^1cY\u0016T!A \u0010\u0002\u0015\r|G\u000e\\3di&|g.C\u0002\u0002\u0002m\u00141\"\u0011:sCf\u0014UO\u001a4feB!\u0011QAA\u0006\u001d\r1\u0015qA\u0005\u0004\u0003\u00139\u0015!D(gMN,GoQ8oi\u0016DH/\u0003\u0003\u0002\u000e\u0005=!AB(gMN,GOC\u0002\u0002\n\u001dC\u0011\"a\u0005\u0001\u0001\u0004%I!!\u0006\u0002!=4gm]3u\u0005V4g-\u001a:`I\u0015\fH\u0003BA\f\u0003;\u00012!HA\r\u0013\r\tYB\b\u0002\u0005+:LG\u000fC\u0005\u0002 \u0005E\u0011\u0011!a\u0001s\u0006\u0019\u0001\u0010J\u0019\t\u000f\u0005\r\u0002\u0001)Q\u0005s\u0006iqN\u001a4tKR\u0014UO\u001a4fe\u0002B\u0011\"a\n\u0001\u0001\u0004%I!!\u000b\u0002\u001d\tdwnY6PM\u001a\u001cX\r^'baV\u0011\u00111\u0006\t\t\u0003[\tY$a\u0010\u0002F5\u0011\u0011q\u0006\u0006\u0005\u0003c\t\u0019$\u0001\u0006d_:\u001cWO\u001d:f]RTA!!\u000e\u00028\u0005!Q\u000f^5m\u0015\t\tI$\u0001\u0003kCZ\f\u0017\u0002BA\u001f\u0003_\u0011\u0011cQ8oGV\u0014(/\u001a8u\u0011\u0006\u001c\b.T1q!\r\u0011\u0016\u0011I\u0005\u0004\u0003\u0007\u001a&!D*ue\u0016\fWN\u00117pG.LE\rE\u0003\u001e\u0003\u000f\n\u0019!C\u0002\u0002Jy\u0011Q!\u0011:sCfD\u0011\"!\u0014\u0001\u0001\u0004%I!a\u0014\u0002%\tdwnY6PM\u001a\u001cX\r^'ba~#S-\u001d\u000b\u0005\u0003/\t\t\u0006\u0003\u0006\u0002 \u0005-\u0013\u0011!a\u0001\u0003WA\u0001\"!\u0016\u0001A\u0003&\u00111F\u0001\u0010E2|7m[(gMN,G/T1qA!I\u0011\u0011\f\u0001A\u0002\u0013%\u00111L\u0001\u000fE2|7m[$f]\u0016\u0014\u0018\r^8s+\t\ti\u0006E\u0002\u0012\u0003?J1!!\u0019\u0013\u00059\u0011En\\2l\u000f\u0016tWM]1u_JD\u0011\"!\u001a\u0001\u0001\u0004%I!a\u001a\u0002%\tdwnY6HK:,'/\u0019;pe~#S-\u001d\u000b\u0005\u0003/\tI\u0007\u0003\u0006\u0002 \u0005\r\u0014\u0011!a\u0001\u0003;B\u0001\"!\u001c\u0001A\u0003&\u0011QL\u0001\u0010E2|7m[$f]\u0016\u0014\u0018\r^8sA!I\u0011\u0011\u000f\u0001A\u0002\u0013%\u00111O\u0001\u0019[\u0016\u001c8/Y4f\u0011\u0006tG\r\\3s)\"\u0014X-\u00193Q_>dWCAA;!\u0011\ti#a\u001e\n\t\u0005e\u0014q\u0006\u0002\u0013)\"\u0014X-\u00193Q_>dW\t_3dkR|'\u000fC\u0005\u0002~\u0001\u0001\r\u0011\"\u0003\u0002��\u0005aR.Z:tC\u001e,\u0007*\u00198eY\u0016\u0014H\u000b\u001b:fC\u0012\u0004vn\u001c7`I\u0015\fH\u0003BA\f\u0003\u0003C!\"a\b\u0002|\u0005\u0005\t\u0019AA;\u0011!\t)\t\u0001Q!\n\u0005U\u0014!G7fgN\fw-\u001a%b]\u0012dWM\u001d+ie\u0016\fG\rU8pY\u0002B\u0011\"!#\u0001\u0001\u0004%I!a#\u0002\r\rd\u0017.\u001a8u+\t\ti\tE\u0002k\u0003\u001fK1!!%\u0003\u0005A!\u0015\r^1ik\n\u001cE.[3oi>\u0003H\u000fC\u0005\u0002\u0016\u0002\u0001\r\u0011\"\u0003\u0002\u0018\u0006Q1\r\\5f]R|F%Z9\u0015\t\u0005]\u0011\u0011\u0014\u0005\u000b\u0003?\t\u0019*!AA\u0002\u00055\u0005\u0002CAO\u0001\u0001\u0006K!!$\u0002\u000f\rd\u0017.\u001a8uA!I\u0011\u0011\u0015\u0001A\u0002\u0013%\u00111U\u0001\n_\u001a47/\u001a;Dib,\"!!*\u0011\u0007\u0019\u000b9+C\u0002\u0002*\u001e\u0013Qb\u00144gg\u0016$8i\u001c8uKb$\b\"CAW\u0001\u0001\u0007I\u0011BAX\u00035ygMZ:fi\u000e#\bp\u0018\u0013fcR!\u0011qCAY\u0011)\ty\"a+\u0002\u0002\u0003\u0007\u0011Q\u0015\u0005\t\u0003k\u0003\u0001\u0015)\u0003\u0002&\u0006QqN\u001a4tKR\u001cE\u000f\u001f\u0011\t\u0013\u0005e\u0006\u00011A\u0005\n\u0005m\u0016a\u0003;pa&\u001c'+Z:vYR,\"!!0\u0011\u0007\u0019\u000by,C\u0002\u0002B\u001e\u0013abR3u)>\u0004\u0018n\u0019*fgVdG\u000fC\u0005\u0002F\u0002\u0001\r\u0011\"\u0003\u0002H\u0006yAo\u001c9jGJ+7/\u001e7u?\u0012*\u0017\u000f\u0006\u0003\u0002\u0018\u0005%\u0007BCA\u0010\u0003\u0007\f\t\u00111\u0001\u0002>\"A\u0011Q\u001a\u0001!B\u0013\ti,\u0001\u0007u_BL7MU3tk2$\b\u0005C\u0005\u0002R\u0002\u0001\r\u0011\"\u0003\u0002T\u000611-\u001e:t_J,\u0012!\f\u0005\n\u0003/\u0004\u0001\u0019!C\u0005\u00033\f!bY;sg>\u0014x\fJ3r)\u0011\t9\"a7\t\u0013\u0005}\u0011Q[A\u0001\u0002\u0004i\u0003bBAp\u0001\u0001\u0006K!L\u0001\bGV\u00148o\u001c:!\u0011%\t\u0019\u000f\u0001b\u0001\n\u0013\t)/A\bhKR\u0014VmY8sI2KW.\u001b;t+\t\t9\u000fE\u0002\u001e\u0003SL1!a;\u001f\u0005\rIe\u000e\u001e\u0005\t\u0003_\u0004\u0001\u0015!\u0003\u0002h\u0006\u0001r-\u001a;SK\u000e|'\u000f\u001a'j[&$8\u000f\t\u0005\n\u0003g\u0004!\u0019!C\u0005\u0003k\f!cZ3u%\u0016\u001cwN\u001d3TY\u0016,\u0007\u000fV5nKV\u0011\u0011q\u001f\t\u0004;\u0005e\u0018bAA~=\t!Aj\u001c8h\u0011!\ty\u0010\u0001Q\u0001\n\u0005]\u0018aE4fiJ+7m\u001c:e'2,W\r\u001d+j[\u0016\u0004\u0003\"\u0003B\u0002\u0001\t\u0007I\u0011BAs\u0003)\u0019w.\\7jiJ{wo\u001d\u0005\t\u0005\u000f\u0001\u0001\u0015!\u0003\u0002h\u0006Y1m\\7nSR\u0014vn^:!\u0011\u001d\u0011Y\u0001\u0001C!\u0005\u001b\tqa\u001c8Ti\u0006\u0014H\u000f\u0006\u0002\u0002\u0018!9!\u0011\u0003\u0001\u0005B\t5\u0011AB8o'R|\u0007O\u0002\u0004\u0003\u0016\u00011!q\u0003\u0002\u000f\u001b\u0016\u001c8/Y4f\u0011\u0006tG\r\\3s'\u0019\u0011\u0019B!\u0007\u0003&A!!1\u0004B\u0011\u001b\t\u0011iB\u0003\u0003\u0003 \u0005]\u0012\u0001\u00027b]\u001eLAAa\t\u0003\u001e\t1qJ\u00196fGR\u0004BAa\u0007\u0003(%!!\u0011\u0006B\u000f\u0005!\u0011VO\u001c8bE2,\u0007bCAE\u0005'\u0011\t\u0011)A\u0005\u0003\u001bCqA\u001aB\n\t\u0003\u0011y\u0003\u0006\u0003\u00032\tU\u0002\u0003\u0002B\u001a\u0005'i\u0011\u0001\u0001\u0005\t\u0003\u0013\u0013i\u00031\u0001\u0002\u000e\"A!\u0011\bB\n\t\u0003\u0012i!A\u0002sk:DqA!\u0010\u0001\t\u0013\u0011y$A\u000bti>\u0014X-T3tg\u0006<W-\u00118e\u001f\u001a47/\u001a;\u0015\r\u0005]!\u0011\tB#\u0011\u001d\u0011\u0019Ea\u000fA\u0002Y\taA]3d_J$\u0007\u0002\u0003B$\u0005w\u0001\r!a\u0001\u0002\r=4gm]3u\u0011\u001d\u0011Y\u0005\u0001C\u0005\u0005\u001b\nA\"\u001e9eCR,wJ\u001a4tKR$B!a\u0006\u0003P!A!q\tB%\u0001\u0004\t\u0019\u0001C\u0004\u0003T\u0001!IA!\u0016\u0002)I,W.Z7cKJ\u0014En\\2l\u001f\u001a47/\u001a;t)\u0011\t9Ba\u0016\t\u0011\te#\u0011\u000ba\u0001\u0003\u007f\tqA\u00197pG.LE\rC\u0004\u0003^\u0001!IAa\u0018\u00023M$xN]3CY>\u001c7.\u00118e\u0007>lW.\u001b;PM\u001a\u001cX\r\u001e\u000b\u0007\u0003/\u0011\tGa\u0019\t\u0011\te#1\fa\u0001\u0003\u007fA\u0001B!\u001a\u0003\\\u0001\u0007!qM\u0001\fCJ\u0014\u0018-\u001f\"vM\u001a,'\u000f\r\u0003\u0003j\t5\u0004\u0003\u0002>��\u0005W\u00022a\u0006B7\t-\u0011yGa\u0019\u0002\u0002\u0003\u0005)\u0011A\u000e\u0003\u0007}#\u0013\u0007C\u0004\u0003t\u0001!IA!\u001e\u0002\u0019\r|W.\\5u\u001f\u001a47/\u001a;\u0015\t\u0005]!q\u000f\u0005\bo\nE\u0004\u0019AA#\r\u0019\u0011Y\b\u0001\u0004\u0003~\t)r)\u001a8fe\u0006$X\r\u001a\"m_\u000e\\\u0007*\u00198eY\u0016\u00148C\u0002B=\u0005\u007f\u0012)\tE\u0002\u001e\u0005\u0003K1Aa!\u001f\u0005\u0019\te.\u001f*fMB\u0019\u0011Ca\"\n\u0007\t%%C\u0001\fCY>\u001c7nR3oKJ\fGo\u001c:MSN$XM\\3s\u0011\u001d1'\u0011\u0010C\u0001\u0005\u001b#\"Aa$\u0011\t\tM\"\u0011\u0010\u0005\t\u0005'\u0013I\b\"\u0011\u0003\u0016\u0006IqN\\!eI\u0012\u000bG/\u0019\u000b\u0007\u0003/\u00119Ja'\t\u000f\te%\u0011\u0013a\u0001E\u0005!A-\u0019;b\u0011\u001d\u0011iJ!%A\u0002\t\n\u0001\"\\3uC\u0012\fG/\u0019\u0005\t\u0005C\u0013I\b\"\u0011\u0003$\u0006yqN\\$f]\u0016\u0014\u0018\r^3CY>\u001c7\u000e\u0006\u0003\u0002\u0018\t\u0015\u0006\u0002\u0003B-\u0005?\u0003\r!a\u0010\t\u0011\t%&\u0011\u0010C!\u0005W\u000b1b\u001c8QkND'\t\\8dWR1\u0011q\u0003BW\u0005_C\u0001B!\u0017\u0003(\u0002\u0007\u0011q\b\u0005\t\u0005K\u00129\u000b1\u0001\u00032B\"!1\u0017B\\!\u0011QxP!.\u0011\u0007]\u00119\fB\u0006\u0003:\n=\u0016\u0011!A\u0001\u0006\u0003Y\"aA0%e!A!Q\u0018B=\t\u0003\u0012y,A\u0004p]\u0016\u0013(o\u001c:\u0015\r\u0005]!\u0011\u0019Bc\u0011\u001d\u0011\u0019Ma/A\u00025\nq!\\3tg\u0006<W\r\u0003\u0005\u0003H\nm\u0006\u0019\u0001Be\u0003%!\bN]8xC\ndW\r\u0005\u0003\u0003L\nmg\u0002\u0002Bg\u0005/tAAa4\u0003V6\u0011!\u0011\u001b\u0006\u0004\u0005'T\u0012A\u0002\u001fs_>$h(C\u0001 \u0013\r\u0011INH\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\u0011iNa8\u0003\u0013QC'o\\<bE2,'b\u0001Bm=\u0001")
/* loaded from: input_file:org/apache/spark/streaming/aliyun/datahub/ReliableDatahubReceiver.class */
public class ReliableDatahubReceiver<T> extends Receiver<T> implements Logging {
    public final String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName;
    public final String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName;
    public final String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$subId;
    private final String accessKeyId;
    private final String accessKeySecret;
    private final String endpoint;
    public final String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId;
    public final Function1<RecordEntry, T> org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$func;
    private ArrayBuffer<OffsetContext.Offset> offsetBuffer;
    private ConcurrentHashMap<StreamBlockId, OffsetContext.Offset[]> blockOffsetMap;
    private BlockGenerator blockGenerator;
    private ThreadPoolExecutor messageHandlerThreadPool;
    private DatahubClientOpt org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client;
    private OffsetContext org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx;
    private GetTopicResult org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult;
    private String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor;
    private final int org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordLimits;
    private final long org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordSleepTime;
    private final int org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitRows;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    /* compiled from: ReliableDatahubReceiver.scala */
    /* loaded from: input_file:org/apache/spark/streaming/aliyun/datahub/ReliableDatahubReceiver$GeneratedBlockHandler.class */
    public final class GeneratedBlockHandler implements BlockGeneratorListener {
        private final /* synthetic */ ReliableDatahubReceiver $outer;

        public void onAddData(Object obj, Object obj2) {
            if (obj2 != null) {
                this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$updateOffset((OffsetContext.Offset) obj2);
            }
        }

        public void onGenerateBlock(StreamBlockId streamBlockId) {
            this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$rememberBlockOffsets(streamBlockId);
        }

        public void onPushBlock(StreamBlockId streamBlockId, ArrayBuffer<?> arrayBuffer) {
            this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$storeBlockAndCommitOffset(streamBlockId, arrayBuffer);
        }

        public void onError(String str, Throwable th) {
            this.$outer.reportError(str, th);
        }

        public GeneratedBlockHandler(ReliableDatahubReceiver<T> reliableDatahubReceiver) {
            if (reliableDatahubReceiver == null) {
                throw null;
            }
            this.$outer = reliableDatahubReceiver;
        }
    }

    /* compiled from: ReliableDatahubReceiver.scala */
    /* loaded from: input_file:org/apache/spark/streaming/aliyun/datahub/ReliableDatahubReceiver$MessageHandler.class */
    public final class MessageHandler implements Runnable {
        private final DatahubClientOpt client;
        private final /* synthetic */ ReliableDatahubReceiver $outer;

        @Override // java.lang.Runnable
        public void run() {
            while (!this.$outer.isStopped()) {
                try {
                    GetRecordsResult records = this.client.getRecords(this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName, this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName, this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId, this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor(), this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordLimits(), this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult().getRecordSchema());
                    List records2 = records.getRecords();
                    if (records2.size() == 0) {
                        Thread.sleep(this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordSleepTime());
                    } else {
                        JavaConversions$.MODULE$.asScalaBuffer(records2).foreach(new ReliableDatahubReceiver$MessageHandler$$anonfun$run$1(this, records));
                    }
                } catch (SparkException e) {
                    this.$outer.reportError(e.getMessage(), e);
                } catch (OffsetResetedException unused) {
                    this.client.updateOffsetContext(this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx());
                    this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor_$eq(this.client.getNextOffsetCursor(this.$outer.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx()).getCursor());
                    this.$outer.logInfo(new ReliableDatahubReceiver$MessageHandler$$anonfun$run$2(this));
                } catch (Exception e2) {
                    this.$outer.reportError("Error handling message", e2);
                }
            }
        }

        public /* synthetic */ ReliableDatahubReceiver org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$MessageHandler$$$outer() {
            return this.$outer;
        }

        public MessageHandler(ReliableDatahubReceiver<T> reliableDatahubReceiver, DatahubClientOpt datahubClientOpt) {
            this.client = datahubClientOpt;
            if (reliableDatahubReceiver == null) {
                throw null;
            }
            this.$outer = reliableDatahubReceiver;
        }
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    private ArrayBuffer<OffsetContext.Offset> offsetBuffer() {
        return this.offsetBuffer;
    }

    private void offsetBuffer_$eq(ArrayBuffer<OffsetContext.Offset> arrayBuffer) {
        this.offsetBuffer = arrayBuffer;
    }

    private ConcurrentHashMap<StreamBlockId, OffsetContext.Offset[]> blockOffsetMap() {
        return this.blockOffsetMap;
    }

    private void blockOffsetMap_$eq(ConcurrentHashMap<StreamBlockId, OffsetContext.Offset[]> concurrentHashMap) {
        this.blockOffsetMap = concurrentHashMap;
    }

    private BlockGenerator blockGenerator() {
        return this.blockGenerator;
    }

    private void blockGenerator_$eq(BlockGenerator blockGenerator) {
        this.blockGenerator = blockGenerator;
    }

    private ThreadPoolExecutor messageHandlerThreadPool() {
        return this.messageHandlerThreadPool;
    }

    private void messageHandlerThreadPool_$eq(ThreadPoolExecutor threadPoolExecutor) {
        this.messageHandlerThreadPool = threadPoolExecutor;
    }

    public DatahubClientOpt org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client;
    }

    private void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client_$eq(DatahubClientOpt datahubClientOpt) {
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client = datahubClientOpt;
    }

    public OffsetContext org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx;
    }

    private void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx_$eq(OffsetContext offsetContext) {
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx = offsetContext;
    }

    public GetTopicResult org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult;
    }

    private void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult_$eq(GetTopicResult getTopicResult) {
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult = getTopicResult;
    }

    public String org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor;
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor_$eq(String str) {
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor = str;
    }

    public int org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordLimits() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordLimits;
    }

    public long org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordSleepTime() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordSleepTime;
    }

    public int org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitRows() {
        return this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitRows;
    }

    public void onStart() {
        logInfo(new ReliableDatahubReceiver$$anonfun$onStart$1(this));
        logInfo(new ReliableDatahubReceiver$$anonfun$onStart$2(this));
        offsetBuffer_$eq(new ArrayBuffer<>());
        blockOffsetMap_$eq(new ConcurrentHashMap<>());
        blockGenerator_$eq(supervisor().createBlockGenerator(new GeneratedBlockHandler(this)));
        messageHandlerThreadPool_$eq(ThreadUtils$.MODULE$.newDaemonFixedThreadPool(1, "DatahubMessageHandler"));
        blockGenerator().start();
        org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client_$eq(new DatahubClientOpt(new DatahubConfiguration(new AliyunAccount(this.accessKeyId, this.accessKeySecret), this.endpoint)));
        org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult_$eq(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().getTopic(this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName));
        org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx_$eq(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().initOffsetContext(this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$subId, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId));
        if (org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx().hasOffset()) {
            try {
                org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor_$eq(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().getNextOffsetCursor(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx()).getCursor());
            } catch (DatahubClientException e) {
                org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor_$eq(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().getCursor(this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId, GetCursorRequest.CursorType.OLDEST).getCursor());
            }
        } else {
            org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor_$eq(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().getCursor(this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName, this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId, GetCursorRequest.CursorType.OLDEST).getCursor());
        }
        logInfo(new ReliableDatahubReceiver$$anonfun$onStart$3(this));
        messageHandlerThreadPool().submit(new MessageHandler(this, org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client()));
    }

    public void onStop() {
        if (messageHandlerThreadPool() != null) {
            messageHandlerThreadPool().shutdown();
            messageHandlerThreadPool_$eq(null);
            logInfo(new ReliableDatahubReceiver$$anonfun$onStop$1(this));
        }
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$storeMessageAndOffset(T t, OffsetContext.Offset offset) {
        blockGenerator().addDataWithCallback(t, offset);
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$updateOffset(OffsetContext.Offset offset) {
        offsetBuffer().append(Predef$.MODULE$.wrapRefArray(new OffsetContext.Offset[]{offset}));
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$rememberBlockOffsets(StreamBlockId streamBlockId) {
        blockOffsetMap().put(streamBlockId, (OffsetContext.Offset[]) offsetBuffer().toArray(ClassTag$.MODULE$.apply(OffsetContext.Offset.class)));
        offsetBuffer().clear();
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$storeBlockAndCommitOffset(StreamBlockId streamBlockId, ArrayBuffer<?> arrayBuffer) {
        int i = 0;
        boolean z = false;
        Exception exc = null;
        while (!z && i <= 3) {
            try {
                store(arrayBuffer);
                z = true;
            } catch (Exception e) {
                i++;
                exc = e;
            }
        }
        if (!z) {
            stop("Error while storing block into Spark,", exc);
        } else {
            Option$.MODULE$.apply(blockOffsetMap().get(streamBlockId)).foreach(new ReliableDatahubReceiver$$anonfun$org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$storeBlockAndCommitOffset$1(this));
            blockOffsetMap().remove(streamBlockId);
        }
    }

    public void org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitOffset(OffsetContext.Offset[] offsetArr) {
        LongRef create = LongRef.create(0L);
        Predef$.MODULE$.refArrayOps(offsetArr).foreach(new ReliableDatahubReceiver$$anonfun$org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitOffset$1(this, create));
        if (create.elem <= 0 || create.elem % org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitRows() == 0) {
            return;
        }
        org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client().commitOffset(org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ReliableDatahubReceiver(String str, String str2, String str3, String str4, String str5, String str6, String str7, Function1<RecordEntry, T> function1, StorageLevel storageLevel, Map<String, String> map, ClassTag<T> classTag) {
        super(storageLevel);
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$projectName = str;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicName = str2;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$subId = str3;
        this.accessKeyId = str4;
        this.accessKeySecret = str5;
        this.endpoint = str6;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$shardId = str7;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$func = function1;
        Logging.class.$init$(this);
        this.offsetBuffer = null;
        this.blockOffsetMap = null;
        this.blockGenerator = null;
        this.messageHandlerThreadPool = null;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$client = null;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$offsetCtx = null;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$topicResult = null;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$cursor = null;
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordLimits = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.batch.getrecord.limits", new ReliableDatahubReceiver$$anonfun$1(this)).toString())).toInt();
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$getRecordSleepTime = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.norecord.waittimes", new ReliableDatahubReceiver$$anonfun$2(this)).toString())).toLong();
        this.org$apache$spark$streaming$aliyun$datahub$ReliableDatahubReceiver$$commitRows = new StringOps(Predef$.MODULE$.augmentString(map.getOrElse("spark.datahub.commit.perrows", new ReliableDatahubReceiver$$anonfun$3(this)).toString())).toInt();
    }
}
