/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.tablestore.hadoop;

import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
import com.alicloud.openservices.tablestore.model.DescribeTableResponse;
import com.alicloud.openservices.tablestore.model.Direction;
import com.alicloud.openservices.tablestore.model.PrimaryKey;
import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
import com.alicloud.openservices.tablestore.model.PrimaryKeySchema;
import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
import com.alicloud.openservices.tablestore.model.RangeRowQueryCriteria;
import com.alicloud.openservices.tablestore.model.RowQueryCriteria;
import com.aliyun.openservices.tablestore.hadoop.Credential;
import com.aliyun.openservices.tablestore.hadoop.Endpoint;
import com.aliyun.openservices.tablestore.hadoop.MultiCriteria;
import com.aliyun.openservices.tablestore.hadoop.PrimaryKeyWritable;
import com.aliyun.openservices.tablestore.hadoop.RowWritable;
import com.aliyun.openservices.tablestore.hadoop.TableStore;
import com.aliyun.openservices.tablestore.hadoop.TableStoreInputSplit;
import com.aliyun.openservices.tablestore.hadoop.TableStoreRecordReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableStoreInputFormat
extends InputFormat<PrimaryKeyWritable, RowWritable> {
    static final String CRITERIA = "TABLESTORE_CRITERIA";
    private static final Logger logger = LoggerFactory.getLogger(TableStoreInputFormat.class);

    public static void setCredential(JobContext job, String accessKeyId, String accessKeySecret) {
        TableStore.setCredential(job, accessKeyId, accessKeySecret);
    }

    public static void setCredential(JobContext job, String accessKeyId, String accessKeySecret, String securityToken) {
        TableStore.setCredential(job, accessKeyId, accessKeySecret, securityToken);
    }

    public static void setCredential(Configuration conf, Credential cred) {
        TableStore.setCredential(conf, cred);
    }

    public static void setEndpoint(JobContext job, String endpoint) {
        TableStore.setEndpoint(job, endpoint);
    }

    public static void setEndpoint(JobContext job, String endpoint, String instance) {
        TableStore.setEndpoint(job, endpoint, instance);
    }

    public static void setEndpoint(Configuration conf, Endpoint ep) {
        TableStore.setEndpoint(conf, ep);
    }

    public static void addCriteria(JobContext job, RangeRowQueryCriteria criteria) {
        Preconditions.checkNotNull((Object)job, (Object)"job must be nonnull");
        TableStoreInputFormat.addCriteria(job.getConfiguration(), criteria);
    }

    public static void addCriteria(Configuration conf, RangeRowQueryCriteria criteria) {
        Preconditions.checkNotNull((Object)criteria, (Object)"criteria must be nonnull");
        Preconditions.checkArgument((criteria.getDirection() == Direction.FORWARD ? 1 : 0) != 0, (Object)"criteria must be forward");
        String cur = conf.get(CRITERIA);
        MultiCriteria cri = null;
        cri = cur == null ? new MultiCriteria() : MultiCriteria.deserialize(cur);
        cri.addCriteria(criteria);
        conf.set(CRITERIA, cri.serialize());
    }

    public static void clearCriteria(JobContext job) {
        Preconditions.checkNotNull((Object)job, (Object)"job must be nonnull");
        TableStoreInputFormat.clearCriteria(job.getConfiguration());
    }

    public static void clearCriteria(Configuration conf) {
        Preconditions.checkNotNull((Object)conf, (Object)"conf must be nonnull");
        conf.unset(CRITERIA);
    }

    public RecordReader<PrimaryKeyWritable, RowWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        TableStoreRecordReader rdr = new TableStoreRecordReader();
        rdr.initialize(split, context);
        return rdr;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        SyncClientInterface ots = TableStore.newOtsClient(conf);
        try {
            List<InputSplit> list = TableStoreInputFormat.getSplits(conf, ots);
            return list;
        }
        finally {
            ots.shutdown();
        }
    }

    public static List<InputSplit> getSplits(Configuration conf, SyncClientInterface ots) {
        List<RangeRowQueryCriteria> scans = TableStoreInputFormat.getScans(conf);
        logger.info("{} scans", (Object)scans.size());
        Set<String> tables = TableStoreInputFormat.collectTables(scans);
        Map<String, List<PrimaryKey>> splits = TableStoreInputFormat.fetchSplits(ots, tables);
        int splitCnt = 0;
        for (List<PrimaryKey> entry : splits.values()) {
            splitCnt += entry.size();
        }
        logger.info("{} split points in {} tables", (Object)splitCnt, (Object)tables.size());
        ArrayList<InputSplit> res = new ArrayList<InputSplit>();
        for (RangeRowQueryCriteria s : scans) {
            String table = s.getTableName();
            List<PrimaryKey> split = splits.get(table);
            Preconditions.checkNotNull(split, (Object)"");
            PrimaryKey start = s.getInclusiveStartPrimaryKey();
            int startIdx = TableStoreInputFormat.locateStart(split, start);
            PrimaryKey end = s.getExclusiveEndPrimaryKey();
            int endIdx = TableStoreInputFormat.locateEnd(split, end);
            Preconditions.checkArgument((startIdx <= endIdx ? 1 : 0) != 0, (Object)"inclusive-start primary key must be smaller than exclusive-end primary key for FORWARD direction");
            if (startIdx == endIdx) {
                res.add(new TableStoreInputSplit(s));
                continue;
            }
            for (int i = startIdx; i <= endIdx; ++i) {
                RangeRowQueryCriteria x = TableStoreInputFormat.clone(s);
                if (i > startIdx) {
                    x.setInclusiveStartPrimaryKey(split.get(i - 1));
                }
                if (i < endIdx) {
                    x.setExclusiveEndPrimaryKey(split.get(i));
                }
                res.add(new TableStoreInputSplit(x));
            }
        }
        logger.info("{} splits", (Object)res.size());
        return res;
    }

    private static List<RangeRowQueryCriteria> getScans(Configuration conf) {
        String cur = conf.get(CRITERIA);
        MultiCriteria cri = null;
        cri = cur == null ? new MultiCriteria() : MultiCriteria.deserialize(cur);
        List<RangeRowQueryCriteria> scans = cri.getCriteria();
        return scans;
    }

    private static Set<String> collectTables(List<RangeRowQueryCriteria> scans) {
        HashSet<String> res = new HashSet<String>();
        for (RangeRowQueryCriteria s : scans) {
            String tbl = s.getTableName();
            res.add(tbl);
        }
        return res;
    }

    private static Map<String, List<PrimaryKey>> fetchSplits(SyncClientInterface ots, Set<String> tables) {
        HashMap<String, List<PrimaryKey>> res = new HashMap<String, List<PrimaryKey>>();
        for (String tbl : tables) {
            if (res.containsKey(tbl)) continue;
            DescribeTableResponse resp = ots.describeTable(new DescribeTableRequest(tbl));
            List schema = resp.getTableMeta().getPrimaryKeyList();
            ArrayList<PrimaryKey> pkeys = new ArrayList<PrimaryKey>();
            for (PrimaryKey pkey : resp.getShardSplits()) {
                PrimaryKeyColumn[] cols = pkey.getPrimaryKeyColumns();
                Preconditions.checkArgument((cols.length <= schema.size() ? 1 : 0) != 0, (Object)"# of primary key columns in one shard split is greater than that of table schema");
                if (cols.length == schema.size()) {
                    pkeys.add(pkey);
                    continue;
                }
                PrimaryKeyColumn[] newCols = new PrimaryKeyColumn[schema.size()];
                System.arraycopy(cols, 0, newCols, 0, cols.length);
                for (int i = cols.length; i < schema.size(); ++i) {
                    newCols[i] = new PrimaryKeyColumn(((PrimaryKeySchema)schema.get(i)).getName(), PrimaryKeyValue.INF_MIN);
                }
                pkeys.add(new PrimaryKey(newCols));
            }
            res.put(tbl, pkeys);
        }
        return res;
    }

    private static int locateStart(List<PrimaryKey> points, PrimaryKey p) {
        int pos = Collections.binarySearch(points, p);
        if (pos >= 0) {
            return pos + 1;
        }
        return -pos - 1;
    }

    private static int locateEnd(List<PrimaryKey> points, PrimaryKey p) {
        int pos = Collections.binarySearch(points, p);
        if (pos >= 0) {
            return pos;
        }
        return -pos - 1;
    }

    private static RangeRowQueryCriteria clone(RangeRowQueryCriteria src) {
        RangeRowQueryCriteria res = new RangeRowQueryCriteria(src.getTableName());
        if (src.getLimit() > 0) {
            res.setLimit(src.getLimit());
        }
        res.setDirection(src.getDirection());
        res.setInclusiveStartPrimaryKey(src.getInclusiveStartPrimaryKey());
        res.setExclusiveEndPrimaryKey(src.getExclusiveEndPrimaryKey());
        src.copyTo((RowQueryCriteria)res);
        return res;
    }
}

