/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.datahub;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.proto.ProtobufRecordStreamReader;
import com.aliyun.odps.commons.proto.XstreamPack;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.datahub.DatahubException;
import com.aliyun.odps.datahub.PackType;
import com.aliyun.odps.datahub.SeekPackResult;
import com.aliyun.odps.rest.RestClient;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

public class DatahubReader {
    private RestClient datahubServiceClient;
    private TableSchema tableSchema;
    private String path;
    private MessageDigest messageDigest;
    private Map<String, String> params;
    private Map<String, String> headers;
    private byte[] packMeta;
    private String lastPackId;
    private String nextPackId;
    private PackType.ReadMode readMode;
    private ProtobufRecordStreamReader protobufRecordStreamReader;

    public DatahubReader(RestClient datahubServiceClient, TableSchema tableSchema, String path, Map<String, String> params, Map<String, String> headers) {
        this(datahubServiceClient, tableSchema, path, params, headers, PackType.FIRST_PACK_ID);
    }

    public DatahubReader(RestClient datahubServiceClient, TableSchema tableSchema, String path, Map<String, String> params, Map<String, String> headers, String packId) {
        this.datahubServiceClient = datahubServiceClient;
        this.tableSchema = tableSchema;
        this.path = path;
        this.params = params;
        this.headers = headers;
        this.packMeta = null;
        try {
            this.messageDigest = MessageDigest.getInstance("MD5");
        }
        catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e.getMessage());
        }
        this.protobufRecordStreamReader = null;
        this.seek(packId, PackType.ReadMode.SEEK_CUR);
    }

    public Record read() throws OdpsException, IOException {
        Record r = null;
        do {
            if (this.protobufRecordStreamReader == null) continue;
            try {
                r = this.protobufRecordStreamReader.read();
            }
            catch (IOException e) {
                this.protobufRecordStreamReader = null;
                this.nextPackId = this.lastPackId;
                this.readMode = PackType.ReadMode.SEEK_CUR;
                throw e;
            }
        } while (r == null && this.getPack("all"));
        return r;
    }

    public void skipPack() {
        this.seek(this.lastPackId, PackType.ReadMode.SEEK_NEXT);
    }

    public String getLastPackId() {
        return this.lastPackId;
    }

    private void seek(String rpid, PackType.ReadMode mode) {
        if (!(rpid != null && !rpid.equals("") || mode.equals((Object)PackType.ReadMode.SEEK_BEGIN) || mode.equals((Object)PackType.ReadMode.SEEK_END))) {
            throw new IllegalArgumentException("Invalid pack id.");
        }
        switch (mode) {
            case SEEK_BEGIN: {
                this.nextPackId = PackType.FIRST_PACK_ID;
                break;
            }
            case SEEK_END: {
                this.nextPackId = PackType.LAST_PACK_ID;
                break;
            }
            case SEEK_CUR: 
            case SEEK_NEXT: {
                this.nextPackId = rpid.toString();
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid pack read mode.");
            }
        }
        this.readMode = mode;
        this.protobufRecordStreamReader = null;
    }

    public SeekPackResult seek(long timeStamp) throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>(this.params);
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        try {
            params.put("timestamp", Long.toString(timeStamp));
            Connection conn = this.datahubServiceClient.connect(this.path, "GET", params, headers);
            Response resp = conn.getResponse();
            if (!resp.isOK()) {
                DatahubException ex = new DatahubException(conn.getInputStream());
                ex.setRequestId(resp.getHeader("x-odps-request-id"));
                throw ex;
            }
            String json = IOUtils.readStreamAsString(conn.getInputStream());
            JSONObject tree = JSON.parseObject((String)json);
            String node = tree.getString("PackId");
            if (node != null) {
                SeekPackResult startPack = new SeekPackResult(node);
                return startPack;
            }
            throw new DatahubException("get pack id fail");
        }
        catch (DatahubException e) {
            throw e;
        }
        catch (Exception e) {
            throw new DatahubException(e.getMessage(), e);
        }
    }

    private boolean getPack(String fetchMode) throws OdpsException, IOException {
        this.protobufRecordStreamReader = null;
        HashMap<String, String> params = new HashMap<String, String>(this.params);
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        try {
            String strMode = this.readMode.equals((Object)PackType.ReadMode.SEEK_NEXT) ? "AFTER_PACKID" : "AT_PACKID";
            params.put("packid", this.nextPackId);
            params.put("iteratemode", strMode);
            params.put("packnum", "1");
            params.put("packfetchmode", fetchMode);
            Connection conn = this.datahubServiceClient.connect(this.path, "GET", params, headers);
            Response resp = conn.getResponse();
            if (!resp.isOK()) {
                DatahubException ex = new DatahubException(conn.getInputStream());
                ex.setRequestId(resp.getHeader("x-odps-request-id"));
                throw ex;
            }
            String num = resp.getHeader("x-odps-pack-num");
            if (num.equals("0")) {
                return false;
            }
            InputStream in = conn.getInputStream();
            byte[] bytes = IOUtils.readFully(in);
            XstreamPack.XStreamPack pack = XstreamPack.XStreamPack.parseFrom(bytes);
            if (fetchMode.equals("all")) {
                bytes = pack.getPackData().toByteArray();
                this.protobufRecordStreamReader = new ProtobufRecordStreamReader(this.tableSchema, new ByteArrayInputStream(bytes));
            }
            this.packMeta = pack.hasPackMeta() ? pack.getPackMeta().toByteArray() : "".getBytes();
            String npid = resp.getHeader("x-odps-next-packid");
            this.lastPackId = resp.getHeader("x-odps-current-packid");
            if (!npid.equals(PackType.LAST_PACK_ID)) {
                this.nextPackId = npid;
                this.readMode = PackType.ReadMode.SEEK_CUR;
            } else {
                this.nextPackId = this.lastPackId;
                this.readMode = PackType.ReadMode.SEEK_NEXT;
            }
            return true;
        }
        catch (DatahubException e) {
            throw e;
        }
        catch (Exception e) {
            throw new DatahubException(e.getMessage(), e);
        }
    }

    public byte[] readMeta() throws OdpsException, IOException {
        if (this.getPack("meta")) {
            return this.packMeta;
        }
        return null;
    }

    private String generatorMD5(byte[] bytes) {
        byte[] digest = this.messageDigest.digest(bytes);
        StringBuilder sb = new StringBuilder();
        for (byte b : digest) {
            sb.append(String.format("%02X", b));
        }
        return sb.toString();
    }
}

