package org.apache.flink.runtime.operators.resettable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.class */
public class SpillingResettableIteratorTest {
    private static final int NUM_TESTRECORDS = 50000;
    private static final int MEMORY_CAPACITY = 10485760;
    private IOManager ioman;
    private MemoryManager memman;
    private Iterator<IntValue> reader;
    private final AbstractInvokable memOwner = new DummyInvokable();
    private final TypeSerializer<IntValue> serializer = new IntValueSerializer();

    @Before
    public void startup() {
        this.memman = new MemoryManager(10485760L, 1, 32768, MemoryType.HEAP, true);
        this.ioman = new IOManagerAsync();
        ArrayList arrayList = new ArrayList(NUM_TESTRECORDS);
        for (int i = 0; i < NUM_TESTRECORDS; i++) {
            arrayList.add(new IntValue(i));
        }
        this.reader = arrayList.iterator();
    }

    @After
    public void shutdown() throws Exception {
        this.ioman.close();
        this.ioman = null;
        if (!this.memman.verifyEmpty()) {
            Assert.fail("A memory leak has occurred: Not all memory was properly returned to the memory manager.");
        }
        this.memman.shutdown();
        this.memman = null;
    }

    @Test
    public void testResettableIterator() {
        try {
            SpillingResettableIterator spillingResettableIterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            spillingResettableIterator.open();
            int i = 0;
            while (spillingResettableIterator.hasNext()) {
                String str = "In initial run, element " + i + " does not match expected value!";
                int i2 = i;
                i++;
                Assert.assertEquals(str, i2, ((IntValue) spillingResettableIterator.next()).getValue());
            }
            Assert.assertEquals("Too few elements were deserialzied in initial run!", 50000L, i);
            for (int i3 = 0; i3 < 10; i3++) {
                int i4 = 0;
                spillingResettableIterator.reset();
                while (spillingResettableIterator.hasNext()) {
                    String str2 = "After reset nr. " + i3 + "1 element " + i4 + " does not match expected value!";
                    int i5 = i4;
                    i4++;
                    Assert.assertEquals(str2, i5, ((IntValue) spillingResettableIterator.next()).getValue());
                }
                Assert.assertEquals("Too few elements were deserialzied after reset nr. " + i3 + "1!", 50000L, i4);
            }
            spillingResettableIterator.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test encountered an exception.");
        }
    }

    @Test
    public void testResettableIteratorInMemory() {
        try {
            SpillingResettableIterator spillingResettableIterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 20, this.memOwner);
            spillingResettableIterator.open();
            int i = 0;
            while (spillingResettableIterator.hasNext()) {
                String str = "In initial run, element " + i + " does not match expected value!";
                int i2 = i;
                i++;
                Assert.assertEquals(str, i2, ((IntValue) spillingResettableIterator.next()).getValue());
            }
            Assert.assertEquals("Too few elements were deserialzied in initial run!", 50000L, i);
            for (int i3 = 0; i3 < 10; i3++) {
                int i4 = 0;
                spillingResettableIterator.reset();
                while (spillingResettableIterator.hasNext()) {
                    String str2 = "After reset nr. " + i3 + "1 element " + i4 + " does not match expected value!";
                    int i5 = i4;
                    i4++;
                    Assert.assertEquals(str2, i5, ((IntValue) spillingResettableIterator.next()).getValue());
                }
                Assert.assertEquals("Too few elements were deserialzied after reset nr. " + i3 + "1!", 50000L, i4);
            }
            spillingResettableIterator.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test encountered an exception.");
        }
    }

    @Test
    public void testHasNext() {
        try {
            SpillingResettableIterator spillingResettableIterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            spillingResettableIterator.open();
            int i = 0;
            while (spillingResettableIterator.hasNext()) {
                spillingResettableIterator.hasNext();
                spillingResettableIterator.next();
                i++;
            }
            Assert.assertTrue(i + " elements read from iterator, but " + NUM_TESTRECORDS + " expected", i == NUM_TESTRECORDS);
            spillingResettableIterator.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test encountered an exception.");
        }
    }

    @Test
    public void testNext() {
        try {
            SpillingResettableIterator spillingResettableIterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            spillingResettableIterator.open();
            for (int i = 0; i < NUM_TESTRECORDS; i++) {
                Assert.assertTrue("Record was not read from iterator", ((IntValue) spillingResettableIterator.next()) != null);
            }
            try {
                Assert.fail("Too many records were read from iterator.");
            } catch (NoSuchElementException e) {
            }
            spillingResettableIterator.close();
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test encountered an exception.");
        }
    }
}
