package org.apache.flink.runtime.rest.handler.async;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.class */
public class AbstractAsynchronousOperationHandlersTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers;
    private TestingAsynchronousOperationHandlers.TestingTriggerHandler testingTriggerHandler;
    private TestingAsynchronousOperationHandlers.TestingStatusHandler testingStatusHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$OperationResult.class */
    public static final class OperationResult {

        @Nullable
        private final Throwable throwable;

        @Nullable
        private final String value;

        OperationResult(@Nullable String str, @Nullable Throwable th) {
            this.value = str;
            this.throwable = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestOperationKey.class */
    public static final class TestOperationKey extends OperationKey {
        protected TestOperationKey(TriggerId triggerId) {
            super(triggerId);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestingAsynchronousOperationHandlers.class */
    private static final class TestingAsynchronousOperationHandlers extends AbstractAsynchronousOperationHandlers<TestOperationKey, String> {

        /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestingAsynchronousOperationHandlers$TestingStatusHandler.class */
        class TestingStatusHandler extends AbstractAsynchronousOperationHandlers<TestOperationKey, String>.StatusHandler<RestfulGateway, OperationResult, TriggerMessageParameters> {
            protected TestingStatusHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map, MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<OperationResult>, TriggerMessageParameters> messageHeaders) {
                super(TestingAsynchronousOperationHandlers.this, gatewayRetriever, time, map, messageHeaders);
            }

            protected TestOperationKey getOperationKey(HandlerRequest<EmptyRequestBody, TriggerMessageParameters> handlerRequest) {
                return new TestOperationKey((TriggerId) handlerRequest.getPathParameter(TriggerIdPathParameter.class));
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: exceptionalOperationResultResponse, reason: merged with bridge method [inline-methods] */
            public OperationResult m273exceptionalOperationResultResponse(Throwable th) {
                return new OperationResult(null, th);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            public OperationResult operationResultResponse(String str) {
                return new OperationResult(str, null);
            }

            /* renamed from: getOperationKey, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ OperationKey m274getOperationKey(HandlerRequest handlerRequest) {
                return getOperationKey((HandlerRequest<EmptyRequestBody, TriggerMessageParameters>) handlerRequest);
            }
        }

        /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestingAsynchronousOperationHandlers$TestingTriggerHandler.class */
        class TestingTriggerHandler extends AbstractAsynchronousOperationHandlers<TestOperationKey, String>.TriggerHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
            protected TestingTriggerHandler(GatewayRetriever<? extends RestfulGateway> gatewayRetriever, Time time, Map<String, String> map, MessageHeaders<EmptyRequestBody, TriggerResponse, EmptyMessageParameters> messageHeaders) {
                super(TestingAsynchronousOperationHandlers.this, gatewayRetriever, time, map, messageHeaders);
            }

            protected CompletableFuture<String> triggerOperation(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway restfulGateway) throws RestHandlerException {
                return restfulGateway.triggerSavepoint(new JobID(), (String) null, false, this.timeout);
            }

            protected TestOperationKey createOperationKey(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest) {
                return new TestOperationKey(new TriggerId());
            }

            /* renamed from: createOperationKey, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ OperationKey m275createOperationKey(HandlerRequest handlerRequest) {
                return createOperationKey((HandlerRequest<EmptyRequestBody, EmptyMessageParameters>) handlerRequest);
            }
        }

        private TestingAsynchronousOperationHandlers() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestingStatusMessageHeaders.class */
    private static final class TestingStatusMessageHeaders extends AsynchronousOperationStatusMessageHeaders<OperationResult, TriggerMessageParameters> {
        private static final TestingStatusMessageHeaders INSTANCE = new TestingStatusMessageHeaders();

        private TestingStatusMessageHeaders() {
        }

        protected Class<OperationResult> getValueClass() {
            return OperationResult.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public TriggerMessageParameters m277getUnresolvedMessageParameters() {
            return new TriggerMessageParameters();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "foobar";
        }

        public String getDescription() {
            return "";
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TestingTriggerMessageHeaders.class */
    private static final class TestingTriggerMessageHeaders extends AsynchronousOperationTriggerMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
        static final TestingTriggerMessageHeaders INSTANCE = new TestingTriggerMessageHeaders();

        private TestingTriggerMessageHeaders() {
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        protected String getAsyncOperationDescription() {
            return "";
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public EmptyMessageParameters m279getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "barfoo";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest$TriggerMessageParameters.class */
    public static final class TriggerMessageParameters extends MessageParameters {
        private final TriggerIdPathParameter triggerIdPathParameter;

        private TriggerMessageParameters() {
            this.triggerIdPathParameter = new TriggerIdPathParameter();
        }

        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.triggerIdPathParameter);
        }

        public Collection<MessageQueryParameter<?>> getQueryParameters() {
            return Collections.emptyList();
        }
    }

    @Before
    public void setup() {
        this.testingAsynchronousOperationHandlers = new TestingAsynchronousOperationHandlers();
        TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers = this.testingAsynchronousOperationHandlers;
        testingAsynchronousOperationHandlers.getClass();
        this.testingTriggerHandler = new TestingAsynchronousOperationHandlers.TestingTriggerHandler(() -> {
            return null;
        }, TIMEOUT, Collections.emptyMap(), TestingTriggerMessageHeaders.INSTANCE);
        TestingAsynchronousOperationHandlers testingAsynchronousOperationHandlers2 = this.testingAsynchronousOperationHandlers;
        testingAsynchronousOperationHandlers2.getClass();
        this.testingStatusHandler = new TestingAsynchronousOperationHandlers.TestingStatusHandler(() -> {
            return null;
        }, TIMEOUT, Collections.emptyMap(), TestingStatusMessageHeaders.INSTANCE);
    }

    @Test
    public void testOperationCompletion() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerSavepointFunction((jobID, str) -> {
            return completableFuture;
        }).build();
        TriggerId triggerId = ((TriggerResponse) this.testingTriggerHandler.handleRequest(triggerOperationRequest(), build).get()).getTriggerId();
        Assert.assertThat(((AsynchronousOperationResult) this.testingStatusHandler.handleRequest(statusOperationRequest(triggerId), build).get()).queueStatus().getId(), Matchers.is(QueueStatus.inProgress().getId()));
        completableFuture.complete("foobar");
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) this.testingStatusHandler.handleRequest(statusOperationRequest(triggerId), build).get();
        Assert.assertThat(asynchronousOperationResult.queueStatus().getId(), Matchers.is(QueueStatus.completed().getId()));
        Assert.assertThat(((OperationResult) asynchronousOperationResult.resource()).value, Matchers.is("foobar"));
    }

    @Test
    public void testOperationFailure() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerSavepointFunction((jobID, str) -> {
            return FutureUtils.completedExceptionally(flinkException);
        }).build();
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) this.testingStatusHandler.handleRequest(statusOperationRequest(((TriggerResponse) this.testingTriggerHandler.handleRequest(triggerOperationRequest(), build).get()).getTriggerId()), build).get();
        Assert.assertThat(asynchronousOperationResult.queueStatus().getId(), Matchers.is(QueueStatus.completed().getId()));
        Assert.assertThat(((OperationResult) asynchronousOperationResult.resource()).throwable, Matchers.is(flinkException));
    }

    @Test
    public void testUnknownTriggerId() throws Exception {
        try {
            this.testingStatusHandler.handleRequest(statusOperationRequest(new TriggerId()), new TestingRestfulGateway.Builder().build()).get();
            Assert.fail("This should have failed with a RestHandlerException.");
        } catch (ExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, RestHandlerException.class);
            Assert.assertThat(Boolean.valueOf(findThrowable.isPresent()), Matchers.is(true));
            RestHandlerException restHandlerException = (RestHandlerException) findThrowable.get();
            Assert.assertThat(restHandlerException.getMessage(), Matchers.containsString("Operation not found"));
            Assert.assertThat(restHandlerException.getHttpResponseStatus(), Matchers.is(HttpResponseStatus.NOT_FOUND));
        }
    }

    @Test
    public void testCloseShouldFinishOnFirstServedResult() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerSavepointFunction((jobID, str) -> {
            return completableFuture;
        }).build();
        TriggerId triggerId = ((TriggerResponse) this.testingTriggerHandler.handleRequest(triggerOperationRequest(), build).get()).getTriggerId();
        CompletableFuture closeAsync = this.testingStatusHandler.closeAsync();
        this.testingStatusHandler.handleRequest(statusOperationRequest(triggerId), build).get();
        Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Matchers.is(false));
        completableFuture.complete("foobar");
        this.testingStatusHandler.handleRequest(statusOperationRequest(triggerId), build).get();
        Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Matchers.is(true));
    }

    private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> triggerOperationRequest() throws HandlerRequestException {
        return new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance());
    }

    private static HandlerRequest<EmptyRequestBody, TriggerMessageParameters> statusOperationRequest(TriggerId triggerId) throws HandlerRequestException {
        return new HandlerRequest<>(EmptyRequestBody.getInstance(), new TriggerMessageParameters(), Collections.singletonMap("triggerid", triggerId.toString()), Collections.emptyMap());
    }
}
