/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class SchedulingUtils {
    public static CompletableFuture<Void> schedule(ScheduleMode scheduleMode, Iterable<ExecutionVertex> vertices, ExecutionGraph executionGraph) {
        switch (scheduleMode) {
            case LAZY_FROM_SOURCES: 
            case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST: {
                return SchedulingUtils.scheduleLazy(vertices, executionGraph);
            }
            case EAGER: {
                return SchedulingUtils.scheduleEager(vertices, executionGraph);
            }
        }
        throw new IllegalStateException(String.format("Schedule mode %s is invalid.", new Object[]{scheduleMode}));
    }

    public static CompletableFuture<Void> scheduleLazy(Iterable<ExecutionVertex> vertices, ExecutionGraph executionGraph) {
        executionGraph.assertRunningInJobMasterMainThread();
        SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        Set<AllocationID> previousAllocations = SchedulingUtils.computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider());
        ArrayList<CompletableFuture<Void>> schedulingFutures = new ArrayList<CompletableFuture<Void>>();
        for (ExecutionVertex executionVertex : vertices) {
            if (!executionVertex.getJobVertex().getJobVertex().isInputVertex() && !executionVertex.checkInputDependencyConstraints()) continue;
            CompletableFuture<Void> schedulingVertexFuture = executionVertex.scheduleForExecution(slotProviderStrategy, LocationPreferenceConstraint.ANY, previousAllocations);
            schedulingFutures.add(schedulingVertexFuture);
        }
        return FutureUtils.waitForAll(schedulingFutures);
    }

    public static CompletableFuture<Void> scheduleEager(Iterable<ExecutionVertex> vertices, ExecutionGraph executionGraph) {
        executionGraph.assertRunningInJobMasterMainThread();
        Preconditions.checkState((executionGraph.getState() == JobStatus.RUNNING ? 1 : 0) != 0, (Object)"job is not running currently");
        ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<CompletableFuture<Execution>>();
        SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(SchedulingUtils.computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));
        for (ExecutionVertex ev : vertices) {
            CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(slotProviderStrategy, LocationPreferenceConstraint.ALL, allPreviousAllocationIds);
            allAllocationFutures.add(allocationFuture);
        }
        FutureUtils.ConjunctFuture allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);
        return ((CompletableFuture)allAllocationsFuture.thenAccept(executionsToDeploy -> {
            for (Execution execution : executionsToDeploy) {
                try {
                    execution.deploy();
                }
                catch (Throwable t) {
                    throw new CompletionException((Throwable)new FlinkException(String.format("Could not deploy execution %s.", execution), t));
                }
            }
        })).exceptionally(throwable -> {
            Object resultThrowable;
            Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
            if (strippedThrowable instanceof TimeoutException) {
                int numTotal = allAllocationsFuture.getNumFuturesTotal();
                int numComplete = allAllocationsFuture.getNumFuturesCompleted();
                String message = "Could not allocate all requires slots within timeout of " + executionGraph.getAllocationTimeout() + ". Slots required: " + numTotal + ", slots allocated: " + numComplete + ", previous allocation IDs: " + allPreviousAllocationIds;
                StringBuilder executionMessageBuilder = new StringBuilder();
                for (int i = 0; i < allAllocationFutures.size(); ++i) {
                    CompletableFuture executionFuture = (CompletableFuture)allAllocationFutures.get(i);
                    try {
                        Execution execution = executionFuture.getNow(null);
                        if (execution != null) {
                            executionMessageBuilder.append("completed: " + execution);
                        } else {
                            executionMessageBuilder.append("incomplete: " + executionFuture);
                        }
                    }
                    catch (CompletionException completionException) {
                        executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture);
                    }
                    if (i >= allAllocationFutures.size() - 1) continue;
                    executionMessageBuilder.append(", ");
                }
                message = message + ", execution status: " + executionMessageBuilder.toString();
                resultThrowable = new NoResourceAvailableException(message);
            } else {
                resultThrowable = strippedThrowable;
            }
            throw new CompletionException((Throwable)resultThrowable);
        });
    }

    private static Set<AllocationID> computePriorAllocationIdsIfRequiredByScheduling(Iterable<ExecutionVertex> vertices, SlotProvider slotProvider) {
        if (slotProvider instanceof Scheduler && ((Scheduler)slotProvider).requiresPreviousExecutionGraphAllocations()) {
            return SchedulingUtils.computePriorAllocationIds(vertices);
        }
        return Collections.emptySet();
    }

    private static Set<AllocationID> computePriorAllocationIds(Iterable<ExecutionVertex> vertices) {
        HashSet<AllocationID> allPreviousAllocationIds = new HashSet<AllocationID>();
        for (ExecutionVertex executionVertex : vertices) {
            AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation();
            if (latestPriorAllocation == null) continue;
            allPreviousAllocationIds.add(latestPriorAllocation);
        }
        return allPreviousAllocationIds;
    }
}

