package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.class */
public class CreditBasedPartitionRequestClientHandlerTest {
    @Test(timeout = 60000)
    public void testReleaseInputChannelDuringDecode() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(bufferProvider.isDestroyed())).thenReturn(true);
        Mockito.when(Boolean.valueOf(bufferProvider.addBufferListener((BufferListener) Matchers.any(BufferListener.class)))).thenReturn(false);
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        NettyMessage.BufferResponse createBufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, remoteInputChannel.getInputChannelId(), 2);
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse);
    }

    @Test
    public void testReceiveEmptyBuffer() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        NettyMessage.BufferResponse createBufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(0), 0, remoteInputChannel.getInputChannelId(), 2);
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse);
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.never())).onError((Throwable) Matchers.any(Throwable.class));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onEmptyBuffer(0, 2);
    }

    @Test
    public void testReceiveBuffer() throws Exception {
        MemorySegmentProvider networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel buildRemoteAndSetToGate = InputChannelBuilder.newBuilder().setMemorySegmentProvider(networkBufferPool).buildRemoteAndSetToGate(createSingleInputGate);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(8, 8));
            createSingleInputGate.assignExclusiveSegments();
            CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
            creditBasedPartitionRequestClientHandler.addInputChannel(buildRemoteAndSetToGate);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, buildRemoteAndSetToGate.getInputChannelId(), 2));
            Assert.assertEquals(1L, buildRemoteAndSetToGate.getNumberOfQueuedBuffers());
            Assert.assertEquals(2L, buildRemoteAndSetToGate.getSenderBacklog());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testThrowExceptionForNoAvailableBuffer() throws Exception {
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.spy(InputChannelBuilder.newBuilder().buildRemoteAndSetToGate(InputChannelTestUtils.createSingleInputGate(1)));
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        Assert.assertEquals("There should be no buffers available in the channel.", 0L, remoteInputChannel.getNumberOfAvailableBuffers());
        creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32768), 0, remoteInputChannel.getInputChannelId(), 2));
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onError((Throwable) Matchers.any(IllegalStateException.class));
    }

    @Test
    public void testReceivePartitionNotFoundException() throws Exception {
        BufferProvider bufferProvider = (BufferProvider) Mockito.mock(BufferProvider.class);
        Mockito.when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        Mockito.when(remoteInputChannel.getBufferProvider()).thenReturn(bufferProvider);
        NettyMessage.ErrorResponse errorResponse = new NettyMessage.ErrorResponse(new PartitionNotFoundException(new ResultPartitionID()), remoteInputChannel.getInputChannelId());
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
        Mockito.when(channelHandlerContext.channel()).thenReturn(Mockito.mock(Channel.class));
        creditBasedPartitionRequestClientHandler.channelActive(channelHandlerContext);
        creditBasedPartitionRequestClientHandler.channelRead(channelHandlerContext, errorResponse);
        ((RemoteInputChannel) Mockito.verify(remoteInputChannel, Mockito.times(1))).onFailedPartitionRequest();
    }

    @Test
    public void testCancelBeforeActive() throws Exception {
        RemoteInputChannel remoteInputChannel = (RemoteInputChannel) Mockito.mock(RemoteInputChannel.class);
        Mockito.when(remoteInputChannel.getInputChannelId()).thenReturn(new InputChannelID());
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        creditBasedPartitionRequestClientHandler.addInputChannel(remoteInputChannel);
        creditBasedPartitionRequestClientHandler.cancelRequestFor((InputChannelID) null);
        creditBasedPartitionRequestClientHandler.cancelRequestFor(remoteInputChannel.getInputChannelId());
    }

    @Test
    public void testNotifyCreditAvailable() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient, (MemorySegmentProvider) networkBufferPool);
        RemoteInputChannel createRemoteInputChannel2 = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient, (MemorySegmentProvider) networkBufferPool);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.assignExclusiveSegments();
            createRemoteInputChannel.requestSubpartition(0);
            createRemoteInputChannel2.requestSubpartition(0);
            Assert.assertTrue(embeddedChannel.isWritable());
            Object readOutbound = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound, org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound).credit);
            Object readOutbound2 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound2, org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(createRemoteInputChannel2.getInputChannelId(), ((NettyMessage.PartitionRequest) readOutbound2).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) readOutbound2).credit);
            NettyMessage.BufferResponse createBufferResponse = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, createRemoteInputChannel.getInputChannelId(), 1);
            NettyMessage.BufferResponse createBufferResponse2 = PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, createRemoteInputChannel2.getInputChannelId(), 1);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), createBufferResponse2);
            Assert.assertEquals(2L, createRemoteInputChannel.getUnannouncedCredit());
            Assert.assertEquals(2L, createRemoteInputChannel2.getUnannouncedCredit());
            embeddedChannel.runPendingTasks();
            Object readOutbound3 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound3, org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(createRemoteInputChannel.getInputChannelId(), ((NettyMessage.AddCredit) readOutbound3).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.AddCredit) readOutbound3).credit);
            Object readOutbound4 = embeddedChannel.readOutbound();
            Assert.assertThat(readOutbound4, org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(createRemoteInputChannel2.getInputChannelId(), ((NettyMessage.AddCredit) readOutbound4).receiverId);
            Assert.assertEquals(2L, ((NettyMessage.AddCredit) readOutbound4).credit);
            Assert.assertNull(embeddedChannel.readOutbound());
            ByteBuf blockChannel = PartitionRequestQueueTest.blockChannel(embeddedChannel);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 1, createRemoteInputChannel.getInputChannelId(), 1));
            Assert.assertEquals(1L, createRemoteInputChannel.getUnannouncedCredit());
            Assert.assertEquals(0L, createRemoteInputChannel2.getUnannouncedCredit());
            embeddedChannel.runPendingTasks();
            Assert.assertFalse(embeddedChannel.isWritable());
            Assert.assertNull(embeddedChannel.readOutbound());
            embeddedChannel.flush();
            Assert.assertSame(blockChannel, embeddedChannel.readOutbound());
            Assert.assertTrue(embeddedChannel.isWritable());
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.AddCredit.class));
            Assert.assertEquals(1L, ((NettyMessage.AddCredit) r0).credit);
            Assert.assertEquals(0L, createRemoteInputChannel.getUnannouncedCredit());
            Assert.assertEquals(0L, createRemoteInputChannel2.getUnannouncedCredit());
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testNotifyCreditAvailableAfterReleased() throws Exception {
        ChannelHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelHandler[]{creditBasedPartitionRequestClientHandler});
        NettyPartitionRequestClient nettyPartitionRequestClient = new NettyPartitionRequestClient(embeddedChannel, creditBasedPartitionRequestClientHandler, (ConnectionID) Mockito.mock(ConnectionID.class), (PartitionRequestClientFactory) Mockito.mock(PartitionRequestClientFactory.class));
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, 32, 2);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, (PartitionRequestClient) nettyPartitionRequestClient, (MemorySegmentProvider) networkBufferPool);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(6, 6));
            createSingleInputGate.assignExclusiveSegments();
            createRemoteInputChannel.requestSubpartition(0);
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.PartitionRequest.class));
            Assert.assertEquals(2L, ((NettyMessage.PartitionRequest) r0).credit);
            creditBasedPartitionRequestClientHandler.channelRead((ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class), PartitionRequestClientHandlerTest.createBufferResponse(TestBufferFactory.createBuffer(32), 0, createRemoteInputChannel.getInputChannelId(), 1));
            Assert.assertEquals(2L, createRemoteInputChannel.getUnannouncedCredit());
            createSingleInputGate.close();
            Assert.assertThat(embeddedChannel.readOutbound(), org.hamcrest.Matchers.instanceOf(NettyMessage.CloseRequest.class));
            embeddedChannel.runPendingTasks();
            Assert.assertNull(embeddedChannel.readOutbound());
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
        } catch (Throwable th) {
            createSingleInputGate.close();
            networkBufferPool.destroyAllBufferPools();
            networkBufferPool.destroy();
            throw th;
        }
    }
}
