package org.apache.spark.sql.aliyun.logservice;

import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Logs;
import com.aliyun.openservices.log.common.Shard;
import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import org.apache.spark.streaming.aliyun.logservice.LoghubClientAgent;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: LoghubOffsetReader.scala */
/* loaded from: input_file:org/apache/spark/sql/aliyun/logservice/LoghubOffsetReader$.class */
public final class LoghubOffsetReader$ implements Logging, Serializable {
    public static final LoghubOffsetReader$ MODULE$ = null;
    private transient LoghubClientAgent logServiceClient;
    private transient ZkClient zkClient;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new LoghubOffsetReader$();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.class.initializeLogIfNecessary(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2(this);
    }

    private LoghubClientAgent logServiceClient() {
        return this.logServiceClient;
    }

    private void logServiceClient_$eq(LoghubClientAgent loghubClientAgent) {
        this.logServiceClient = loghubClientAgent;
    }

    private ZkClient zkClient() {
        return this.zkClient;
    }

    private void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    public ZkClient getOrCreateZKClient(Map<String, String> map) {
        if (zkClient() == null) {
            String str = (String) map.getOrElse("connect.address", new LoghubOffsetReader$$anonfun$5());
            int i = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("zookeeper.session.timeout.ms", new LoghubOffsetReader$$anonfun$6()))).toInt();
            zkClient_$eq(new ZkClient(str, i, new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("zookeeper.connection.timeout.ms", new LoghubOffsetReader$$anonfun$7(i)))).toInt()));
            zkClient().setZkSerializer(new ZkSerializer() { // from class: org.apache.spark.sql.aliyun.logservice.LoghubOffsetReader$$anon$3
                public byte[] serialize(Object obj) {
                    try {
                        return ((String) obj).getBytes("UTF-8");
                    } catch (UnsupportedEncodingException unused) {
                        return null;
                    }
                }

                public Object deserialize(byte[] bArr) {
                    if (bArr == null) {
                        return null;
                    }
                    try {
                        return new String(bArr, "UTF-8");
                    } catch (UnsupportedEncodingException e) {
                        return null;
                    }
                }
            });
        }
        return zkClient();
    }

    public LoghubClientAgent getOrCreateLoghubClient(String str, String str2, String str3) {
        if (logServiceClient() == null) {
            logServiceClient_$eq(new LoghubClientAgent(str3, str, str2));
        }
        return logServiceClient();
    }

    public LoghubClientAgent getOrCreateLoghubClient(Map<String, String> map) {
        String str = (String) map.getOrElse("access.key.id", new LoghubOffsetReader$$anonfun$8());
        String str2 = (String) map.getOrElse("access.key.secret", new LoghubOffsetReader$$anonfun$9());
        String str3 = (String) map.getOrElse("endpoint", new LoghubOffsetReader$$anonfun$10());
        if (logServiceClient() == null) {
            logServiceClient_$eq(new LoghubClientAgent(str3, str, str2));
        }
        return logServiceClient();
    }

    public synchronized void resetConsumer(Map<String, String> map) {
        logServiceClient_$eq(null);
        logServiceClient_$eq(getOrCreateLoghubClient(map));
    }

    public StructType loghubSchema() {
        return StructType$.MODULE$.apply(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new StructField[]{new StructField("logProject", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("logStore", StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("shardId", IntegerType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("timestamp", TimestampType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4()), new StructField("value", BinaryType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())})));
    }

    public StructType loghubSchema(String str, String str2, String str3, String str4, String str5) {
        Shard shard;
        LoghubClientAgent orCreateLoghubClient = getOrCreateLoghubClient(str3, str4, str5);
        logInfo(new LoghubOffsetReader$$anonfun$loghubSchema$2());
        Tuple2 partition = JavaConversions$.MODULE$.asScalaBuffer(orCreateLoghubClient.ListShard(str, str2).GetShards()).partition(new LoghubOffsetReader$$anonfun$11());
        if (partition == null) {
            throw new MatchError(partition);
        }
        Tuple2 tuple2 = new Tuple2((Buffer) partition._1(), (Buffer) partition._2());
        Buffer buffer = (Buffer) tuple2._1();
        Buffer buffer2 = (Buffer) tuple2._2();
        if (buffer2.nonEmpty()) {
            shard = (Shard) buffer2.head();
        } else {
            if (!buffer.nonEmpty()) {
                throw new IllegalStateException("There is no one log store shard in usable state.");
            }
            shard = (Shard) buffer.head();
        }
        Shard shard2 = shard;
        ObjectRef create = ObjectRef.create(new StructType());
        LogGroupData logGroupData = (LogGroupData) JavaConversions$.MODULE$.asScalaBuffer(orCreateLoghubClient.BatchGetLog(str, str2, shard2.GetShardId(), 1, orCreateLoghubClient.GetCursor(str, str2, shard2.GetShardId(), orCreateLoghubClient.GetCursorTime(str, str2, shard2.GetShardId(), r0).GetCursorTime() - 600).GetCursor(), orCreateLoghubClient.GetCursor(str, str2, shard2.GetShardId(), Consts.CursorMode.END).GetCursor()).GetLogGroups()).head();
        Logs.Log log = (Logs.Log) JavaConversions$.MODULE$.asScalaBuffer(logGroupData.GetLogGroup().getLogsList()).head();
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__PROJECT__(), StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()));
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__STORE__(), StringType$.MODULE$, false, StructField$.MODULE$.apply$default$4()));
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__SHARD__(), IntegerType$.MODULE$, false, StructField$.MODULE$.apply$default$4()));
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__TIME__(), TimestampType$.MODULE$, false, StructField$.MODULE$.apply$default$4()));
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__TOPIC__(), StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()));
        create.elem = ((StructType) create.elem).add(new StructField(LoghubSourceProvider$.MODULE$.__SOURCE__(), StringType$.MODULE$, true, StructField$.MODULE$.apply$default$4()));
        JavaConversions$.MODULE$.asScalaBuffer(log.getContentsList()).foreach(new LoghubOffsetReader$$anonfun$loghubSchema$3(create));
        FastLogGroup GetFastLogGroup = logGroupData.GetFastLogGroup();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), GetFastLogGroup.getLogTagsCount()).foreach$mVc$sp(new LoghubOffsetReader$$anonfun$loghubSchema$1(create, GetFastLogGroup));
        return (StructType) create.elem;
    }

    private Object readResolve() {
        return MODULE$;
    }

    private LoghubOffsetReader$() {
        MODULE$ = this;
        Logging.class.$init$(this);
        this.logServiceClient = null;
        this.zkClient = null;
    }
}
