/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.dashscope.protocol.okhttp;

import com.alibaba.dashscope.protocol.FullDuplexRequest;
import com.alibaba.dashscope.protocol.okhttp.OkHttpWebSocketClient;
import com.alibaba.dashscope.utils.JsonUtils;
import com.google.gson.JsonObject;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.OkHttpClient;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OkHttpWebSocketClientForAudio
extends OkHttpWebSocketClient {
    private static final Logger log = LoggerFactory.getLogger(OkHttpWebSocketClientForAudio.class);
    private static final AtomicInteger STREAMING_REQUEST_THREAD_NUM = new AtomicInteger(0);
    private static final AtomicBoolean SHUTDOWN_INITIATED = new AtomicBoolean(false);
    private static final ExecutorService STREAMING_REQUEST_EXECUTOR = new ThreadPoolExecutor(1, 100, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), r -> {
        Thread t = new Thread(r, "WS-STREAMING-REQ-Worker-" + STREAMING_REQUEST_THREAD_NUM.updateAndGet(n -> n == Integer.MAX_VALUE ? 0 : n + 1));
        t.setDaemon(true);
        return t;
    });

    public OkHttpWebSocketClientForAudio(OkHttpClient client, boolean passTaskStarted) {
        super(client, passTaskStarted);
        log.info("Use OkHttpWebSocketClientForAudio");
    }

    @Override
    protected CompletableFuture<Void> sendStreamRequest(final FullDuplexRequest req) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                this.isFirstMessage.set(false);
                JsonObject startMessage = req.getStartTaskMessage();
                log.info("send run-task request {}", (Object)JsonUtils.toJson(startMessage));
                final String taskId = startMessage.get("header").getAsJsonObject().get("task_id").getAsString();
                this.sendTextWithRetry(req.getApiKey(), req.isSecurityCheck(), JsonUtils.toJson(startMessage), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                Flowable<Object> streamingData = req.getStreamingData();
                streamingData.subscribe(data -> {
                    try {
                        if (data instanceof String) {
                            JsonObject continueData = req.getContinueMessage((String)data, taskId);
                            this.sendTextWithRetry(req.getApiKey(), req.isSecurityCheck(), JsonUtils.toJson(continueData), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                        } else if (data instanceof byte[]) {
                            this.sendBinaryWithRetry(req.getApiKey(), req.isSecurityCheck(), ByteString.of((byte[])((byte[])data)), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                        } else if (data instanceof ByteBuffer) {
                            this.sendBinaryWithRetry(req.getApiKey(), req.isSecurityCheck(), ByteString.of((ByteBuffer)((ByteBuffer)data)), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                        } else {
                            JsonObject continueData = req.getContinueMessage(data, taskId);
                            this.sendTextWithRetry(req.getApiKey(), req.isSecurityCheck(), JsonUtils.toJson(continueData), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                        }
                    }
                    catch (Throwable ex) {
                        log.error(String.format("sendStreamData exception: %s", ex.getMessage()));
                        this.responseEmitter.onError(ex);
                    }
                }, err -> {
                    log.error(String.format("Get stream data error!", new Object[0]));
                    this.responseEmitter.onError(err);
                }, new Action(){

                    public void run() throws Exception {
                        log.debug(String.format("Stream data send completed!", new Object[0]));
                        OkHttpWebSocketClientForAudio.this.sendTextWithRetry(req.getApiKey(), req.isSecurityCheck(), JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), req.getWorkspace(), req.getHeaders(), req.getBaseWebSocketUrl());
                    }
                });
            }
            catch (Throwable ex) {
                log.error(String.format("sendStreamData exception: %s", ex.getMessage()));
                this.responseEmitter.onError(ex);
            }
        }, STREAMING_REQUEST_EXECUTOR);
        return future;
    }

    private static void shutdownStreamingExecutor() {
        if (!SHUTDOWN_INITIATED.compareAndSet(false, true)) {
            log.debug("Shutdown already in progress");
            return;
        }
        if (!STREAMING_REQUEST_EXECUTOR.isShutdown()) {
            log.debug("Shutting down streaming request executor...");
            STREAMING_REQUEST_EXECUTOR.shutdown();
            try {
                if (!STREAMING_REQUEST_EXECUTOR.awaitTermination(60L, TimeUnit.SECONDS)) {
                    log.warn("Streaming request executor did not terminate in 60 seconds, forcing shutdown...");
                    STREAMING_REQUEST_EXECUTOR.shutdownNow();
                    if (!STREAMING_REQUEST_EXECUTOR.awaitTermination(60L, TimeUnit.SECONDS)) {
                        log.error("Streaming request executor did not terminate");
                    }
                }
            }
            catch (InterruptedException ie) {
                STREAMING_REQUEST_EXECUTOR.shutdownNow();
                Thread.currentThread().interrupt();
            }
            log.info("Streaming request executor shut down completed");
        }
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(OkHttpWebSocketClientForAudio::shutdownStreamingExecutor));
    }
}

