package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.class */
public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger {
    private static final JobID TEST_JOB_ID = new JobID();
    private static final int DEFAULT_PARALLELISM = 2;
    private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    private TestRestartStrategy manuallyTriggeredRestartStrategy;

    @Before
    public void setUp() {
        this.manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
        this.componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(this.manualMainThreadExecutor, Thread.currentThread());
        this.manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
    }

    @Test
    public void testConcurrentRegionFailovers() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph();
        AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = createExecutionGraph.getFailoverStrategy();
        failoverStrategy.setBlockerFuture(new CompletableFuture<>());
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        createExecutionGraph.scheduleForExecution();
        this.manualMainThreadExecutor.triggerAll();
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("task failure 1"));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex3.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex4.getExecutionState());
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("task failure 2"));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex3.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex4.getExecutionState());
        failoverStrategy.getBlockerFuture().complete(null);
        this.manualMainThreadExecutor.triggerAll();
        this.manuallyTriggeredRestartStrategy.triggerAll();
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
        Assert.assertEquals(ExecutionState.CREATED, executionVertex3.getExecutionState());
        Assert.assertEquals(ExecutionState.CREATED, executionVertex4.getExecutionState());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex3.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex4.getCurrentExecutionAttempt().getAttemptNumber());
    }

    @Test
    public void testRegionFailoverInterruptedByGlobalFailover() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph();
        AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = createExecutionGraph.getFailoverStrategy();
        failoverStrategy.setBlockerFuture(new CompletableFuture<>());
        Iterator it = createExecutionGraph.getAllExecutionVertices().iterator();
        ExecutionVertex executionVertex = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex2 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex3 = (ExecutionVertex) it.next();
        ExecutionVertex executionVertex4 = (ExecutionVertex) it.next();
        createExecutionGraph.scheduleForExecution();
        this.manualMainThreadExecutor.triggerAll();
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("task failure"));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(JobStatus.RUNNING, createExecutionGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex3.getExecutionState());
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex4.getExecutionState());
        createExecutionGraph.failGlobal(new Exception("Test global failure"));
        executionVertex2.getCurrentExecutionAttempt().completeCancelling();
        this.manuallyTriggeredRestartStrategy.triggerNextAction();
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(2L, createExecutionGraph.getGlobalModVersion());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex3.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex4.getCurrentExecutionAttempt().getAttemptNumber());
        failoverStrategy.getBlockerFuture().complete(null);
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
        Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
        Assert.assertEquals(ExecutionState.CREATED, executionVertex3.getExecutionState());
        Assert.assertEquals(ExecutionState.CREATED, executionVertex4.getExecutionState());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex3.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex4.getCurrentExecutionAttempt().getAttemptNumber());
    }

    @Test
    public void testSkipFailoverIfExecutionStateIsNotRunning() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph();
        ExecutionVertex executionVertex = (ExecutionVertex) createExecutionGraph.getAllExecutionVertices().iterator().next();
        createExecutionGraph.cancel();
        createExecutionGraph.getFailoverStrategy().onTaskFailure(executionVertex.getCurrentExecutionAttempt(), new Exception("Test Exception"));
        this.manualMainThreadExecutor.triggerAll();
        Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
    }

    private ExecutionGraph createExecutionGraph() throws Exception {
        JobVertex jobVertex = new JobVertex("vertex1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(DEFAULT_PARALLELISM);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(DEFAULT_PARALLELISM);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testjob", new JobVertex[]{jobVertex, jobVertex2});
        ExecutionGraph build = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph).setRestartStrategy(this.manuallyTriggeredRestartStrategy).setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG::new).setSlotProvider(new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM)).setPartitionTracker(new PartitionTrackerImpl(jobGraph.getJobID(), NettyShuffleMaster.INSTANCE, resourceID -> {
            return Optional.empty();
        })).build();
        build.start(this.componentMainThreadExecutor);
        return build;
    }
}
