package org.apache.flink.runtime.heartbeat;

import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.class */
public class HeartbeatManagerTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);

    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerTest$TargetDependentHeartbeatReceiver.class */
    private static class TargetDependentHeartbeatReceiver implements HeartbeatTarget<Integer> {
        private volatile int lastReceivedHeartbeatPayload;
        private volatile int lastRequestedHeartbeatPayload;
        private final OneShotLatch latch;

        public TargetDependentHeartbeatReceiver() {
            this(new OneShotLatch());
        }

        public TargetDependentHeartbeatReceiver(OneShotLatch oneShotLatch) {
            this.lastReceivedHeartbeatPayload = -1;
            this.lastRequestedHeartbeatPayload = -1;
            this.latch = oneShotLatch;
        }

        public void receiveHeartbeat(ResourceID resourceID, Integer num) {
            this.lastReceivedHeartbeatPayload = num.intValue();
            this.latch.trigger();
        }

        public void requestHeartbeat(ResourceID resourceID, Integer num) {
            this.lastRequestedHeartbeatPayload = num.intValue();
            this.latch.trigger();
        }

        public int getLastReceivedHeartbeatPayload() {
            return this.lastReceivedHeartbeatPayload;
        }

        public int getLastRequestedHeartbeatPayload() {
            return this.lastRequestedHeartbeatPayload;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerTest$TargetDependentHeartbeatSender.class */
    private static class TargetDependentHeartbeatSender implements HeartbeatListener<Object, Integer> {
        private final ResourceID specialId;
        private final int specialResponse;
        private final int defaultResponse;

        TargetDependentHeartbeatSender(ResourceID resourceID, int i, int i2) {
            this.specialId = resourceID;
            this.specialResponse = i;
            this.defaultResponse = i2;
        }

        public void notifyHeartbeatTimeout(ResourceID resourceID) {
        }

        public void reportPayload(ResourceID resourceID, Object obj) {
        }

        /* renamed from: retrievePayload, reason: merged with bridge method [inline-methods] */
        public Integer m59retrievePayload(ResourceID resourceID) {
            return resourceID.equals(this.specialId) ? Integer.valueOf(this.specialResponse) : Integer.valueOf(this.defaultResponse);
        }
    }

    @Test
    public void testRegularHeartbeat() throws InterruptedException {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(1000L, resourceID, new TestingHeartbeatListenerBuilder().setReportPayloadConsumer((resourceID3, str) -> {
            arrayBlockingQueue.offer(str);
        }).setRetrievePayloadFunction(resourceID4 -> {
            return 42;
        }).createNewTestingHeartbeatListener(), TestingUtils.defaultScheduledExecutor(), LOG);
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(2);
        heartbeatManagerImpl.monitorTarget(resourceID2, new TestingHeartbeatTargetBuilder().setReceiveHeartbeatConsumer((resourceID5, num) -> {
            arrayBlockingQueue2.offer(num);
        }).createTestingHeartbeatTarget());
        heartbeatManagerImpl.requestHeartbeat(resourceID2, "foobar");
        Assert.assertThat(arrayBlockingQueue.take(), Matchers.is("foobar"));
        Assert.assertThat(arrayBlockingQueue2.take(), Matchers.is(42));
        heartbeatManagerImpl.receiveHeartbeat(resourceID2, "barfoo");
        Assert.assertThat(arrayBlockingQueue.take(), Matchers.is("barfoo"));
    }

    @Test
    public void testHeartbeatMonitorUpdate() {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        ((ScheduledExecutor) Mockito.doReturn(scheduledFuture).when(scheduledExecutor)).schedule((Runnable) org.mockito.Matchers.any(Runnable.class), org.mockito.Matchers.anyLong(), (TimeUnit) org.mockito.Matchers.any(TimeUnit.class));
        Object obj = new Object();
        Mockito.when(heartbeatListener.retrievePayload((ResourceID) org.mockito.Matchers.any(ResourceID.class))).thenReturn(CompletableFuture.completedFuture(obj));
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(1000L, resourceID, heartbeatListener, scheduledExecutor, LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class));
        heartbeatManagerImpl.receiveHeartbeat(resourceID2, obj);
        ((ScheduledFuture) Mockito.verify(scheduledFuture, Mockito.times(1))).cancel(true);
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(2))).schedule((Runnable) org.mockito.Matchers.any(Runnable.class), org.mockito.Matchers.eq(1000L), (TimeUnit) org.mockito.Matchers.eq(TimeUnit.MILLISECONDS));
    }

    @Test
    public void testHeartbeatTimeout() throws Exception {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        CompletableFuture completableFuture = new CompletableFuture();
        TestingHeartbeatListenerBuilder retrievePayloadFunction = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(resourceID3 -> {
            return 42;
        });
        completableFuture.getClass();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, resourceID, retrievePayloadFunction.setNotifyHeartbeatTimeoutConsumer((v1) -> {
            r1.complete(v1);
        }).createNewTestingHeartbeatListener(), TestingUtils.defaultScheduledExecutor(), LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, new TestingHeartbeatTargetBuilder().createTestingHeartbeatTarget());
        for (int i = 0; i < 6; i++) {
            heartbeatManagerImpl.receiveHeartbeat(resourceID2, 42);
            Thread.sleep(20L);
        }
        Assert.assertFalse(completableFuture.isDone());
        Assert.assertEquals(resourceID2, (ResourceID) completableFuture.get(2 * 100, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testHeartbeatCluster() throws Exception {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingHeartbeatListener createNewTestingHeartbeatListener = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(resourceID3 -> {
            return 42;
        }).setReportPayloadConsumer((resourceID4, str) -> {
            atomicInteger.incrementAndGet();
        }).createNewTestingHeartbeatListener();
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        TestingHeartbeatListenerBuilder retrievePayloadFunction = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(resourceID5 -> {
            return "1337";
        });
        completableFuture.getClass();
        TestingHeartbeatListener createNewTestingHeartbeatListener2 = retrievePayloadFunction.setNotifyHeartbeatTimeoutConsumer((v1) -> {
            r1.complete(v1);
        }).setReportPayloadConsumer((resourceID6, num) -> {
            atomicInteger2.incrementAndGet();
        }).createNewTestingHeartbeatListener();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, resourceID, createNewTestingHeartbeatListener, TestingUtils.defaultScheduledExecutor(), LOG);
        HeartbeatManagerSenderImpl heartbeatManagerSenderImpl = new HeartbeatManagerSenderImpl(20L, 100L, resourceID2, createNewTestingHeartbeatListener2, TestingUtils.defaultScheduledExecutor(), LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, heartbeatManagerSenderImpl);
        heartbeatManagerSenderImpl.monitorTarget(resourceID, heartbeatManagerImpl);
        Thread.sleep(2 * 100);
        Assert.assertFalse(completableFuture.isDone());
        heartbeatManagerImpl.stop();
        Assert.assertThat((ResourceID) completableFuture.get(2 * 100, TimeUnit.MILLISECONDS), Matchers.is(resourceID));
        Matcher greaterThanOrEqualTo = Matchers.greaterThanOrEqualTo(Integer.valueOf(((int) ((2 * 100) / 20)) / 2));
        Assert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.is(greaterThanOrEqualTo));
        Assert.assertThat(Integer.valueOf(atomicInteger2.get()), Matchers.is(greaterThanOrEqualTo));
    }

    @Test
    public void testTargetUnmonitoring() throws Exception {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("target");
        CompletableFuture completableFuture = new CompletableFuture();
        TestingHeartbeatListenerBuilder retrievePayloadFunction = new TestingHeartbeatListenerBuilder().setRetrievePayloadFunction(resourceID3 -> {
            return 42;
        });
        completableFuture.getClass();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(50L, resourceID, retrievePayloadFunction.setNotifyHeartbeatTimeoutConsumer((v1) -> {
            r1.complete(v1);
        }).createNewTestingHeartbeatListener(), TestingUtils.defaultScheduledExecutor(), LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, new TestingHeartbeatTargetBuilder().createTestingHeartbeatTarget());
        heartbeatManagerImpl.unmonitorTarget(resourceID2);
        try {
            completableFuture.get(2 * 50, TimeUnit.MILLISECONDS);
            Assert.fail("Timeout should time out.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testLastHeartbeatFromUnregisteredTarget() {
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, ResourceID.generate(), (HeartbeatListener) Mockito.mock(HeartbeatListener.class), (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            Assert.assertEquals(-1L, heartbeatManagerImpl.getLastHeartbeatFrom(ResourceID.generate()));
            heartbeatManagerImpl.stop();
        } catch (Throwable th) {
            heartbeatManagerImpl.stop();
            throw th;
        }
    }

    @Test
    public void testLastHeartbeatFrom() {
        ResourceID generate = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class);
        ResourceID generate2 = ResourceID.generate();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, generate, heartbeatListener, (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            heartbeatManagerImpl.monitorTarget(generate2, heartbeatTarget);
            Assert.assertEquals(0L, heartbeatManagerImpl.getLastHeartbeatFrom(generate2));
            long currentTimeMillis = System.currentTimeMillis();
            heartbeatManagerImpl.receiveHeartbeat(generate2, (Object) null);
            Assert.assertTrue(heartbeatManagerImpl.getLastHeartbeatFrom(generate2) >= currentTimeMillis);
            heartbeatManagerImpl.stop();
        } catch (Throwable th) {
            heartbeatManagerImpl.stop();
            throw th;
        }
    }

    @Test
    public void testHeartbeatManagerTargetPayload() throws Exception {
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        HashMap hashMap = new HashMap(2);
        hashMap.put(generate, 0);
        hashMap.put(generate2, 1);
        CompletableFuture completableFuture = new CompletableFuture();
        TestingHeartbeatTarget createTestingHeartbeatTarget = new TestingHeartbeatTargetBuilder().setReceiveHeartbeatConsumer((resourceID, num) -> {
            completableFuture.complete(num);
        }).createTestingHeartbeatTarget();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingHeartbeatTarget createTestingHeartbeatTarget2 = new TestingHeartbeatTargetBuilder().setReceiveHeartbeatConsumer((resourceID2, num2) -> {
            completableFuture2.complete(num2);
        }).createTestingHeartbeatTarget();
        TestingHeartbeatListenerBuilder testingHeartbeatListenerBuilder = new TestingHeartbeatListenerBuilder();
        hashMap.getClass();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, ResourceID.generate(), testingHeartbeatListenerBuilder.setRetrievePayloadFunction((v1) -> {
            return r1.get(v1);
        }).createNewTestingHeartbeatListener(), TestingUtils.defaultScheduledExecutor(), LOG);
        try {
            heartbeatManagerImpl.monitorTarget(generate, createTestingHeartbeatTarget);
            heartbeatManagerImpl.monitorTarget(generate2, createTestingHeartbeatTarget2);
            heartbeatManagerImpl.requestHeartbeat(generate, (Object) null);
            Assert.assertThat(completableFuture.get(), Matchers.is(hashMap.get(generate)));
            heartbeatManagerImpl.requestHeartbeat(generate2, (Object) null);
            Assert.assertThat(completableFuture2.get(), Matchers.is(hashMap.get(generate2)));
            heartbeatManagerImpl.stop();
        } catch (Throwable th) {
            heartbeatManagerImpl.stop();
            throw th;
        }
    }

    @Test
    public void testHeartbeatManagerSenderTargetPayload() throws Exception {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        TargetDependentHeartbeatReceiver targetDependentHeartbeatReceiver = new TargetDependentHeartbeatReceiver(oneShotLatch);
        TargetDependentHeartbeatReceiver targetDependentHeartbeatReceiver2 = new TargetDependentHeartbeatReceiver(oneShotLatch2);
        HeartbeatManagerSenderImpl heartbeatManagerSenderImpl = new HeartbeatManagerSenderImpl(2000L, 100L, ResourceID.generate(), new TargetDependentHeartbeatSender(generate2, 1, 0), new ScheduledExecutorServiceAdapter(scheduledThreadPoolExecutor), LOG);
        try {
            heartbeatManagerSenderImpl.monitorTarget(generate, targetDependentHeartbeatReceiver);
            heartbeatManagerSenderImpl.monitorTarget(generate2, targetDependentHeartbeatReceiver2);
            oneShotLatch.await(5L, TimeUnit.SECONDS);
            oneShotLatch2.await(5L, TimeUnit.SECONDS);
            Assert.assertEquals(0L, targetDependentHeartbeatReceiver.getLastRequestedHeartbeatPayload());
            Assert.assertEquals(1L, targetDependentHeartbeatReceiver2.getLastRequestedHeartbeatPayload());
            heartbeatManagerSenderImpl.stop();
            scheduledThreadPoolExecutor.shutdown();
        } catch (Throwable th) {
            heartbeatManagerSenderImpl.stop();
            scheduledThreadPoolExecutor.shutdown();
            throw th;
        }
    }
}
