/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.graph.local;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.JobConf;
import com.aliyun.odps.graph.job.JobRunner;
import com.aliyun.odps.graph.local.InputSplit;
import com.aliyun.odps.graph.local.LocalRunningJob;
import com.aliyun.odps.graph.local.RuntimeContext;
import com.aliyun.odps.graph.local.master.Master;
import com.aliyun.odps.graph.local.utils.LocalGraphRunUtils;
import com.aliyun.odps.local.common.DownloadMode;
import com.aliyun.odps.local.common.FileSplit;
import com.aliyun.odps.local.common.JobDirecotry;
import com.aliyun.odps.local.common.TableMeta;
import com.aliyun.odps.local.common.WareHouse;
import com.aliyun.odps.local.common.utils.DownloadUtils;
import com.aliyun.odps.local.common.utils.LocalRunUtils;
import com.aliyun.odps.local.common.utils.PartitionUtils;
import com.aliyun.odps.local.common.utils.SchemaUtils;
import com.aliyun.odps.mapred.JobStatus;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.SessionState;
import com.aliyun.odps.utils.StringUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LocalGraphJobRunner
implements JobRunner {
    private static final Log LOG = LogFactory.getLog(LocalGraphJobRunner.class);
    private String jobId;
    private JobConf conf;
    private Odps odps;
    private RuntimeContext ctx;
    private List<FileSplit> inputs;
    private Map<String, TableInfo> outputs;
    private WareHouse wareHouse;
    private JobDirecotry jobDirecotry;

    private void initialize() throws IOException {
        this.odps = SessionState.get().getOdps();
        this.wareHouse = WareHouse.getInstance();
        WareHouse.init((Odps)this.odps, (Configuration)this.conf);
        this.jobId = LocalGraphRunUtils.generateLocalGraphTaskName();
        this.conf.set("odps.mapred.job.name", this.jobId);
        this.ctx = RuntimeContext.create(this.jobId, this.conf);
        this.jobDirecotry = new JobDirecotry();
        this.inputs = new ArrayList<FileSplit>();
    }

    public RunningJob submit() throws OdpsException {
        try {
            this.initialize();
            LOG.info((Object)"run mapreduce job in local mode");
            LOG.info((Object)("job id: " + this.jobId));
            this.runJob();
        }
        catch (Exception e) {
            throw new OdpsException(e);
        }
        return new LocalRunningJob(this.jobId, JobStatus.SUCCEEDED, this.ctx.getCounters());
    }

    private void runJob() throws Exception {
        FileOutputStream fos = new FileOutputStream(this.jobDirecotry.getJobFile());
        this.conf.writeXml((OutputStream)fos);
        fos.close();
        this.processInputs();
        this.processResources();
        this.processOutputs();
        Master master = new Master(LocalGraphRunUtils.getGraphJobConf(this.conf), this.ctx, this.inputs, this.outputs);
        master.run();
        this.moveOutputs();
        System.out.println("graph task finish");
    }

    private void processInputs() throws OdpsException, IOException {
        int maxGraphWorkers = LocalGraphRunUtils.getMaxGraphTasks();
        TableInfo[] inputTableInfos = LocalGraphRunUtils.getInputTables(this.conf);
        if (inputTableInfos.length > 64) {
            throw new OdpsException("ODPS-0720301: Too many job input");
        }
        for (TableInfo tbl : inputTableInfos) {
            this.processInput(tbl);
        }
        if (this.inputs.isEmpty()) {
            this.inputs.add(FileSplit.NullSplit);
        }
        this.checkInputsSize(maxGraphWorkers);
    }

    private void processInput(TableInfo tableInfo) throws IOException, OdpsException {
        block11: {
            File tempDataDir;
            Column[] whReadFields;
            TableMeta whTblMeta;
            String[] readCols;
            String tblName;
            String projName;
            block10: {
                LOG.info((Object)("Processing input: " + tableInfo));
                projName = tableInfo.getProjectName();
                if (projName == null) {
                    projName = SessionState.get().getOdps().getDefaultProject();
                }
                tblName = tableInfo.getTableName();
                readCols = tableInfo.getCols();
                LinkedHashMap expectPartsHashMap = tableInfo.getPartSpec();
                PartitionSpec expectParts = null;
                if (expectPartsHashMap != null && expectPartsHashMap.size() > 0) {
                    StringBuffer sb = new StringBuffer();
                    for (String key : expectPartsHashMap.keySet()) {
                        if (sb.length() > 0) {
                            sb.append(",");
                        }
                        sb.append(key + "=" + (String)expectPartsHashMap.get(key));
                    }
                    expectParts = new PartitionSpec(sb.toString());
                }
                if (!this.wareHouse.existsTable(projName, tblName) || this.wareHouse.getDownloadMode() == DownloadMode.ALWAYS) {
                    DownloadUtils.downloadTableSchemeAndData((Odps)this.odps, (TableInfo)tableInfo, (int)this.wareHouse.getLimitDownloadRecordCount(), (char)this.wareHouse.getInputColumnSeperator());
                    if (!this.wareHouse.existsTable(projName, tblName)) {
                        throw new OdpsException("download table from remote host failure");
                    }
                }
                whTblMeta = this.wareHouse.getTableMeta(projName, tblName);
                whReadFields = LocalRunUtils.getInputTableFields((TableMeta)whTblMeta, (String[])readCols);
                List whParts = this.wareHouse.getPartitions(projName, tblName);
                if (whParts.size() <= 0) break block10;
                for (PartitionSpec partSpec : whParts) {
                    File whSrcDir;
                    if (!this.match(expectParts, partSpec) || LocalRunUtils.listDataFiles((File)(whSrcDir = this.wareHouse.getPartitionDir(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec))).size() <= 0) continue;
                    File tempDataDir2 = this.jobDirecotry.getInputDir(this.wareHouse.getRelativePath(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec, new Object[0]));
                    File tempSchemeDir = this.jobDirecotry.getInputDir(this.wareHouse.getRelativePath(whTblMeta.getProjName(), whTblMeta.getTableName(), null, new Object[0]));
                    this.wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec, readCols, tempSchemeDir, this.wareHouse.getLimitDownloadRecordCount(), this.wareHouse.getInputColumnSeperator());
                    for (File file : LocalRunUtils.listDataFiles((File)tempDataDir2)) {
                        this.inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
                    }
                }
                break block11;
            }
            if (tableInfo.getPartSpec() != null && tableInfo.getPartSpec().size() > 0) {
                throw new IOException("ODPS-0720121: Invalid table partSpectable " + projName + "." + tblName + " is not partitioned table");
            }
            File whSrcDir = this.wareHouse.getTableDir(whTblMeta.getProjName(), whTblMeta.getTableName());
            if (LocalRunUtils.listDataFiles((File)whSrcDir).size() <= 0) break block11;
            File tempSchemeDir = tempDataDir = this.jobDirecotry.getInputDir(this.wareHouse.getRelativePath(whTblMeta.getProjName(), whTblMeta.getTableName(), null, new Object[0]));
            this.wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), null, readCols, tempSchemeDir, this.wareHouse.getLimitDownloadRecordCount(), this.wareHouse.getInputColumnSeperator());
            for (File file : LocalRunUtils.listDataFiles((File)tempDataDir)) {
                this.inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
            }
        }
    }

    private void checkInputsSize(int maxGraphWorkers) throws IOException {
        if (this.inputs.size() > maxGraphWorkers) {
            String msg = "ODPS-0740002: Too many local-run workers : %s, must be <= %s (specified by local_run parameter 'odps.graph.local.max.workers')";
            msg = String.format(msg, this.inputs.size(), maxGraphWorkers);
            throw new IOException(msg);
        }
        if (this.inputs.size() == 0) {
            throw new IOException("ODPS-0720231: Job input is not set");
        }
    }

    private void processResources() throws IOException, OdpsException {
        File resDir = this.jobDirecotry.getResourceDir();
        String curProjName = SessionState.get().getOdps().getDefaultProject();
        Object[] resources = this.conf.getStrings("odps.graph.cache.resources");
        int maxResouceNum = 256;
        if (resources == null || resources.length == 0) {
            return;
        }
        HashSet<String> names = new HashSet<String>(Arrays.asList(resources));
        LOG.info((Object)("Start to process resources: " + StringUtils.join((Object[])resources, (char)',')));
        if (names.size() > maxResouceNum) {
            throw new IOException("ODPS-0720331: Too many cache resources - define too many cache resources, must be <= " + maxResouceNum);
        }
        long resourceSize = 0L;
        URLClassLoader loader = (URLClassLoader)Thread.currentThread().getContextClassLoader();
        ArrayList<URL> cp = new ArrayList<URL>(Arrays.asList(loader.getURLs()));
        for (String name : names) {
            String resName;
            List res = LocalRunUtils.parseResourceName((String)name, (String)curProjName);
            String projName = (String)res.get(0);
            if (!this.wareHouse.existsResource(projName, resName = (String)res.get(1)) || this.wareHouse.getDownloadMode() == DownloadMode.ALWAYS) {
                DownloadUtils.downloadResource((Odps)this.odps, (String)projName, (String)resName, (int)this.wareHouse.getLimitDownloadRecordCount(), (char)this.wareHouse.getInputColumnSeperator());
                resourceSize += new File(resDir, resName).length();
            }
            this.wareHouse.copyResource(projName, resName, resDir, this.wareHouse.getLimitDownloadRecordCount(), this.wareHouse.getInputColumnSeperator());
            cp.add(new File(resDir, resName).toURI().toURL());
            if (resourceSize <= 0x20000000L) continue;
            throw new IOException("ODPS-0720071: Total size of cache resources is too big - must be <= 512M");
        }
        URLClassLoader newLoader = new URLClassLoader(cp.toArray(new URL[0]), (ClassLoader)loader);
        Thread.currentThread().setContextClassLoader(newLoader);
        this.conf.setClassLoader((ClassLoader)newLoader);
    }

    private void processOutputs() throws IOException, OdpsException {
        TableInfo[] tableInfos;
        this.outputs = new HashMap<String, TableInfo>();
        for (TableInfo table : tableInfos = LocalGraphRunUtils.getOutputTables(this.conf)) {
            String projName;
            String label = table.getLabel();
            if (label.equals("__default__")) {
                label = "";
            }
            if ((projName = table.getProjectName()) == null) {
                projName = SessionState.get().getOdps().getDefaultProject();
            }
            String tblName = table.getTableName();
            File tblDir = this.jobDirecotry.getOutputDir(table.getLabel());
            tblDir.mkdirs();
            TableMeta tblMeta = null;
            tblMeta = this.wareHouse.existsTable(projName, tblName) && this.wareHouse.getDownloadMode() != DownloadMode.ALWAYS ? this.wareHouse.getTableMeta(projName, tblName) : DownloadUtils.downloadTableInfo((Odps)this.odps, (TableInfo)table);
            SchemaUtils.generateSchemaFile((TableMeta)tblMeta, null, (File)tblDir);
            this.conf.set("odps.mapred.output.schema." + label, SchemaUtils.toString((Column[])tblMeta.getCols()));
            this.outputs.put(label, table);
        }
    }

    private void moveOutputs() throws IOException {
        for (TableInfo table : LocalGraphRunUtils.getOutputTables(this.conf)) {
            String label = table.getLabel();
            String projName = table.getProjectName();
            if (projName == null) {
                projName = SessionState.get().getOdps().getDefaultProject();
            }
            String tblName = table.getTableName();
            LinkedHashMap partSpec = table.getPartSpec();
            File tempTblDir = this.jobDirecotry.getOutputDir(table.getLabel());
            File whOutputDir = this.wareHouse.createPartitionDir(projName, tblName, PartitionUtils.convert((Map)partSpec));
            if (this.wareHouse.existsTable(projName, tblName)) {
                LOG.info((Object)("Reload warehouse table:" + tblName));
                if (!this.wareHouse.isRetainTempData()) {
                    LocalRunUtils.removeDataFiles((File)whOutputDir);
                }
                this.wareHouse.copyDataFiles(tempTblDir, null, whOutputDir, this.wareHouse.getInputColumnSeperator());
                continue;
            }
            LOG.info((Object)("Copy output to warehouse: label=" + label + " -> " + whOutputDir.getAbsolutePath()));
            FileUtils.copyDirectory((File)tempTblDir, (File)whOutputDir);
        }
    }

    private boolean match(PartitionSpec expectedParts, PartitionSpec parts) {
        if (expectedParts == null) {
            return true;
        }
        if (parts == null || expectedParts.keys().size() > parts.keys().size()) {
            return false;
        }
        for (String key : expectedParts.keys()) {
            String value;
            String expectedValue = expectedParts.get(key);
            if (expectedValue == null || expectedValue.equals(value = parts.get(key))) continue;
            return false;
        }
        return true;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration conf) {
        this.conf = new JobConf(conf);
    }
}

