package org.apache.spark.sql.aliyun.logservice;

import com.aliyun.openservices.log.response.BatchGetLogResponse;
import java.util.concurrent.LinkedBlockingQueue;
import org.I0Itec.zkclient.ZkClient;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.InputMetrics;
import org.apache.spark.sql.aliyun.logservice.LoghubSourceRDD;
import org.apache.spark.streaming.aliyun.logservice.DirectLoghubInputDStream$;
import org.apache.spark.util.NextIterator;
import scala.Predef$;
import scala.StringContext;
import scala.collection.JavaConversions$;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

/* compiled from: LoghubSourceRDD.scala */
/* loaded from: input_file:org/apache/spark/sql/aliyun/logservice/LoghubSourceRDD$$anon$1.class */
public final class LoghubSourceRDD$$anon$1 extends NextIterator<LoghubData> {
    private final ZkClient zkClient;
    private final long count;
    private final int step;
    private int org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead;
    private String org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor;
    private LinkedBlockingQueue<LoghubData> org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData;
    private final InputMetrics inputMetrics;
    private final /* synthetic */ LoghubSourceRDD $outer;
    public final LoghubSourceRDD.ShardPartition shardPartition$1;
    public final Map schemaFieldPos$1;

    private ZkClient zkClient() {
        return this.zkClient;
    }

    private long count() {
        return this.count;
    }

    private int step() {
        return this.step;
    }

    public int org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead() {
        return this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead;
    }

    private void org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead_$eq(int i) {
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead = i;
    }

    public String org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor() {
        return this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor;
    }

    private void org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor_$eq(String str) {
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor = str;
    }

    public LinkedBlockingQueue<LoghubData> org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData() {
        return this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData;
    }

    private void org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData_$eq(LinkedBlockingQueue<LoghubData> linkedBlockingQueue) {
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData = linkedBlockingQueue;
    }

    private InputMetrics inputMetrics() {
        return this.inputMetrics;
    }

    private boolean checkHasNext() {
        if (count() < 0) {
            return !org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor().equals(this.shardPartition$1.endCursor()) || JavaConversions$.MODULE$.collectionAsScalaIterable(org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData()).nonEmpty();
        }
        boolean z = (((long) org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead()) < count() && !org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor().equals(this.shardPartition$1.endCursor())) || JavaConversions$.MODULE$.collectionAsScalaIterable(org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData()).nonEmpty();
        if (!z) {
            DirectLoghubInputDStream$.MODULE$.writeDataToZK(zkClient(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/available/", "/", "/", ".shard"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$checkpointDir, this.$outer.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$project, this.$outer.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$logStore, BoxesRunTime.boxToInteger(this.shardPartition$1.shardId())})), org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor());
        }
        return z;
    }

    /* renamed from: getNext, reason: merged with bridge method [inline-methods] */
    public LoghubData m52getNext() {
        finished_$eq(!checkHasNext());
        if (finished()) {
            return null;
        }
        if (org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData().isEmpty()) {
            fetchNextBatch();
        }
        org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead_$eq(org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead() + 1);
        return org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData().poll();
    }

    public void close() {
        try {
            inputMetrics().incBytesRead(org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead());
            org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData().clear();
            org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData_$eq(null);
        } catch (Exception e) {
            this.$outer.logWarning(new LoghubSourceRDD$$anon$1$$anonfun$close$1(this), e);
        }
    }

    private void fetchNextBatch() {
        BatchGetLogResponse BatchGetLog = this.$outer.mClient().BatchGetLog(this.$outer.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$project, this.$outer.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$logStore, this.shardPartition$1.shardId(), step(), org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor(), this.shardPartition$1.endCursor());
        IntRef create = IntRef.create(0);
        JavaConversions$.MODULE$.asScalaBuffer(BatchGetLog.GetLogGroups()).foreach(new LoghubSourceRDD$$anon$1$$anonfun$fetchNextBatch$1(this, create));
        String org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor = org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor();
        org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor_$eq(BatchGetLog.GetNextCursor());
        this.$outer.logDebug(new LoghubSourceRDD$$anon$1$$anonfun$fetchNextBatch$2(this, create, org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor));
    }

    public /* synthetic */ LoghubSourceRDD org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$$outer() {
        return this.$outer;
    }

    public LoghubSourceRDD$$anon$1(LoghubSourceRDD loghubSourceRDD, TaskContext taskContext, LoghubSourceRDD.ShardPartition shardPartition, Map map) {
        if (loghubSourceRDD == null) {
            throw null;
        }
        this.$outer = loghubSourceRDD;
        this.shardPartition$1 = shardPartition;
        this.schemaFieldPos$1 = map;
        this.zkClient = LoghubOffsetReader$.MODULE$.getOrCreateZKClient(loghubSourceRDD.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$zkParams);
        this.count = shardPartition.count();
        this.step = 1000;
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$hasRead = 0;
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$nextCursor = shardPartition.startCursor();
        this.org$apache$spark$sql$aliyun$logservice$LoghubSourceRDD$$anon$$logData = new LinkedBlockingQueue<>(4096 * step());
        this.inputMetrics = taskContext.taskMetrics().inputMetrics();
        taskContext.addTaskCompletionListener(new LoghubSourceRDD$$anon$1$$anonfun$5(this));
    }
}
