/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.fescar.rm.datasource;

import com.alibaba.fescar.common.exception.NotSupportYetException;
import com.alibaba.fescar.common.thread.NamedThreadFactory;
import com.alibaba.fescar.config.ConfigurationFactory;
import com.alibaba.fescar.core.exception.TransactionException;
import com.alibaba.fescar.core.model.BranchStatus;
import com.alibaba.fescar.core.model.ResourceManagerInbound;
import com.alibaba.fescar.rm.datasource.DataSourceManager;
import com.alibaba.fescar.rm.datasource.DataSourceProxy;
import com.alibaba.fescar.rm.datasource.undo.UndoLogManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncWorker
implements ResourceManagerInbound {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);
    private static final List<Phase2Context> ASYNC_COMMIT_BUFFER = Collections.synchronizedList(new ArrayList());
    private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt("client.async.commit.buffer.limit", 10000);
    private static ScheduledExecutorService timerExecutor;

    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
            ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData));
        } else {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    AsyncWorker.this.doBranchCommits();
                }
                catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());
                }
            }
        }, 10L, 1000L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
        HashMap<String, ArrayList<Phase2Context>> mappedContexts = new HashMap<String, ArrayList<Phase2Context>>();
        Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();
        while (iterator.hasNext()) {
            Phase2Context commitContext = iterator.next();
            ArrayList<Phase2Context> contextsGroupedByResourceId = (ArrayList<Phase2Context>)mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<Phase2Context>();
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);
            iterator.remove();
        }
        for (Map.Entry entry : mappedContexts.entrySet()) {
            Connection conn = null;
            try {
                DataSourceProxy dataSourceProxy = DataSourceManager.get().get((String)entry.getKey());
                conn = dataSourceProxy.getPlainConnection();
            }
            catch (SQLException sqle) {
                LOGGER.warn("Failed to get connection for async committing on " + (String)entry.getKey(), (Throwable)sqle);
                if (conn == null) continue;
                try {
                    conn.close();
                }
                catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", (Throwable)closeEx);
                }
                continue;
            }
            try {
                List contextsGroupedByResourceId = (List)entry.getValue();
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    try {
                        UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                    }
                    catch (Exception ex) {
                        LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", (Throwable)ex);
                    }
                }
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                if (conn == null) continue;
                try {
                    conn.close();
                }
                catch (SQLException closeEx) {
                    LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", (Throwable)closeEx);
                }
            }
        }
    }

    public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        throw new NotSupportYetException();
    }

    private static class Phase2Context {
        String xid;
        long branchId;
        String resourceId;
        String applicationData;

        public Phase2Context(String xid, long branchId, String resourceId, String applicationData) {
            this.xid = xid;
            this.branchId = branchId;
            this.resourceId = resourceId;
            this.applicationData = applicationData;
        }
    }
}

