package org.apache.flink.runtime.io.network.partition;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.class */
public class PartitionTrackerImplTest extends TestLogger {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest$TestingShuffleMaster.class */
    public static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ResultPartitionID> externallyReleasedPartitions;

        private TestingShuffleMaster() {
            this.externallyReleasedPartitions = new ArrayBlockingQueue(4);
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
        }
    }

    @Test
    public void testPipelinedPartitionIsNotTracked() {
        testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
    }

    @Test
    public void testBlockingPartitionIsTracked() {
        testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
    }

    private void testReleaseOnConsumptionHandling(ResultPartitionType resultPartitionType) {
        PartitionTrackerImpl partitionTrackerImpl = new PartitionTrackerImpl(new JobID(), new TestingShuffleMaster(), resourceID -> {
            return Optional.empty();
        });
        ResourceID generate = ResourceID.generate();
        partitionTrackerImpl.startTrackingPartition(generate, createResultPartitionDeploymentDescriptor(new ResultPartitionID(), resultPartitionType, false));
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(generate)), CoreMatchers.is(Boolean.valueOf(resultPartitionType.isBlocking())));
    }

    @Test
    public void testStartStopTracking() {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        PartitionTrackerImpl partitionTrackerImpl = new PartitionTrackerImpl(new JobID(), new TestingShuffleMaster(), resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID resourceID2 = new ResourceID("tracked");
        ResourceID resourceID3 = new ResourceID("untracked");
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID2)), CoreMatchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID3)), CoreMatchers.is(false));
        partitionTrackerImpl.startTrackingPartition(resourceID2, createResultPartitionDeploymentDescriptor(new ResultPartitionID(), true));
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID2)), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID3)), CoreMatchers.is(false));
        partitionTrackerImpl.stopTrackingPartitionsFor(resourceID2);
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID2)), CoreMatchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(resourceID3)), CoreMatchers.is(false));
    }

    @Test
    public void testReleaseCallsWithLocalResources() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        JobID jobID = new JobID();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        PartitionTrackerImpl partitionTrackerImpl = new PartitionTrackerImpl(jobID, testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        partitionTrackerImpl.startTrackingPartition(generate, createResultPartitionDeploymentDescriptor(resultPartitionID, true));
        partitionTrackerImpl.startTrackingPartition(generate2, createResultPartitionDeploymentDescriptor(resultPartitionID2, true));
        partitionTrackerImpl.stopTrackingAndReleasePartitionsFor(generate);
        Assert.assertEquals(1L, arrayBlockingQueue.size());
        Tuple3 tuple3 = (Tuple3) arrayBlockingQueue.remove();
        Assert.assertEquals(generate, tuple3.f0);
        Assert.assertEquals(jobID, tuple3.f1);
        MatcherAssert.assertThat(tuple3.f2, Matchers.contains(new ResultPartitionID[]{resultPartitionID}));
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(resultPartitionID, testingShuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(generate)), CoreMatchers.is(false));
        partitionTrackerImpl.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionID2));
        Assert.assertEquals(1L, arrayBlockingQueue.size());
        Tuple3 tuple32 = (Tuple3) arrayBlockingQueue.remove();
        Assert.assertEquals(generate2, tuple32.f0);
        Assert.assertEquals(jobID, tuple32.f1);
        MatcherAssert.assertThat(tuple32.f2, Matchers.contains(new ResultPartitionID[]{resultPartitionID2}));
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(resultPartitionID2, testingShuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(generate2)), CoreMatchers.is(false));
    }

    @Test
    public void testReleaseCallsWithoutLocalResources() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        PartitionTrackerImpl partitionTrackerImpl = new PartitionTrackerImpl(new JobID(), testingShuffleMaster, resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        ResultPartitionID resultPartitionID2 = new ResultPartitionID();
        partitionTrackerImpl.startTrackingPartition(generate, createResultPartitionDeploymentDescriptor(resultPartitionID, false));
        partitionTrackerImpl.startTrackingPartition(generate2, createResultPartitionDeploymentDescriptor(resultPartitionID2, false));
        partitionTrackerImpl.stopTrackingAndReleasePartitionsFor(generate);
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(resultPartitionID, testingShuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(generate)), CoreMatchers.is(false));
        partitionTrackerImpl.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionID2));
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(1L, testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals(resultPartitionID2, testingShuffleMaster.externallyReleasedPartitions.remove());
        MatcherAssert.assertThat(Boolean.valueOf(partitionTrackerImpl.isTrackingPartitionsFor(generate2)), CoreMatchers.is(false));
    }

    @Test
    public void testStopTrackingIssuesNoReleaseCalls() {
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(4);
        PartitionTrackerImpl partitionTrackerImpl = new PartitionTrackerImpl(new JobID(), new TestingShuffleMaster(), resourceID -> {
            return Optional.of(createTaskExecutorGateway(resourceID, arrayBlockingQueue));
        });
        ResourceID generate = ResourceID.generate();
        partitionTrackerImpl.startTrackingPartition(generate, createResultPartitionDeploymentDescriptor(new ResultPartitionID(), true));
        partitionTrackerImpl.stopTrackingPartitionsFor(generate);
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(0L, testingShuffleMaster.externallyReleasedPartitions.size());
    }

    private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionID, boolean z) {
        return createResultPartitionDeploymentDescriptor(resultPartitionID, ResultPartitionType.BLOCKING, z);
    }

    private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(final ResultPartitionID resultPartitionID, ResultPartitionType resultPartitionType, final boolean z) {
        return new ResultPartitionDeploymentDescriptor(new PartitionDescriptor(new IntermediateDataSetID(), resultPartitionID.getPartitionId(), resultPartitionType, 1, 0), new ShuffleDescriptor() { // from class: org.apache.flink.runtime.io.network.partition.PartitionTrackerImplTest.1
            public ResultPartitionID getResultPartitionID() {
                return resultPartitionID;
            }

            public Optional<ResourceID> storesLocalResourcesOn() {
                return z ? Optional.of(ResourceID.generate()) : Optional.empty();
            }
        }, 1, true);
    }

    private static TaskExecutorGateway createTaskExecutorGateway(ResourceID resourceID, Collection<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> collection) {
        return new TestingTaskExecutorGatewayBuilder().setReleasePartitionsConsumer((jobID, collection2) -> {
            collection.add(Tuple3.of(resourceID, jobID, collection2));
        }).createTestingTaskExecutorGateway();
    }
}
