/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.ai.util;

import com.alibaba.cloud.ai.enums.StreamResponseType;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
import com.alibaba.cloud.ai.graph.streaming.StreamingChatGenerator;
import com.alibaba.cloud.ai.util.ChatResponseUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;

public class StreamingChatGeneratorUtil {
    private static final Logger logger = LoggerFactory.getLogger(StreamingChatGeneratorUtil.class);

    public static AsyncGenerator<? extends NodeOutput> createEmptyGenerator(String nodeName, OverAllState state, Flux<ChatResponse> flux) {
        AsyncGenerator build = StreamingChatGenerator.builder().startingNode(nodeName).startingState(state).mapResult(chatResponse -> Map.of()).build(flux);
        return build;
    }

    public static AsyncGenerator<? extends NodeOutput> createStreamPrintGenerator(String text) {
        AsyncGenerator build = StreamingChatGenerator.builder().mapResult(chatResponse -> Map.of()).build(Flux.just((Object)ChatResponseUtil.createCustomStatusResponse(text)));
        return build;
    }

    public static AsyncGenerator<? extends NodeOutput> createEmptyGenerator(Flux<ChatResponse> flux) {
        AsyncGenerator build = StreamingChatGenerator.builder().mapResult(chatResponse -> Map.of()).build(flux);
        return build;
    }

    public static AsyncGenerator<? extends NodeOutput> createGenerator(String nodeName, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux) {
        AsyncGenerator build = StreamingChatGenerator.builder().startingNode(nodeName).startingState(state).mapResult(mapResultFunction).build(flux);
        return build;
    }

    public static AsyncGenerator<? extends NodeOutput> createEmptyGenerator(Class<?> nodeClass, OverAllState state, Flux<ChatResponse> flux) {
        return StreamingChatGeneratorUtil.createEmptyGenerator(nodeClass.getSimpleName(), state, flux);
    }

    public static AsyncGenerator<? extends NodeOutput> createGenerator(Class<?> nodeClass, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux) {
        return StreamingChatGeneratorUtil.createGenerator(nodeClass.getSimpleName(), state, mapResultFunction, flux);
    }

    public static AsyncGenerator<? extends NodeOutput> createGeneratorWithCallback(Class<?> nodeClass, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux, Consumer<Map<String, Object>> onCompleteCallback) {
        CompletableFuture resultFuture = new CompletableFuture();
        Flux wrappedFlux = flux.doOnNext(response -> {
            try {
                Map result = (Map)mapResultFunction.apply((ChatResponse)response);
                resultFuture.complete(result);
            }
            catch (Exception e) {
                logger.error("Error occurred while processing response", (Throwable)e);
            }
        }).doFinally(signalType -> {
            if (signalType == SignalType.ON_COMPLETE) {
                try {
                    if (resultFuture.isDone()) {
                        onCompleteCallback.accept((Map)resultFuture.get());
                    }
                }
                catch (Exception e) {
                    logger.error("Error occurred while executing completion callback", (Throwable)e);
                }
            }
        });
        return StreamingChatGenerator.builder().startingNode(nodeClass.getSimpleName()).startingState(state).mapResult(mapResultFunction).build(wrappedFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createGeneratorWithCallback(String nodeName, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux, Consumer<Map<String, Object>> onCompleteCallback) {
        CompletableFuture resultFuture = new CompletableFuture();
        Flux wrappedFlux = flux.doOnNext(response -> {
            try {
                Map result = (Map)mapResultFunction.apply((ChatResponse)response);
                resultFuture.complete(result);
            }
            catch (Exception e) {
                logger.error("Error occurred while processing response", (Throwable)e);
            }
        }).doFinally(signalType -> {
            if (signalType == SignalType.ON_COMPLETE) {
                try {
                    if (resultFuture.isDone()) {
                        onCompleteCallback.accept((Map)resultFuture.get());
                    }
                }
                catch (Exception e) {
                    logger.error("Error occurred while executing completion callback", (Throwable)e);
                }
            }
        });
        return StreamingChatGenerator.builder().startingNode(nodeName).startingState(state).mapResult(mapResultFunction).build(wrappedFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createGeneratorWithOrderedNotifications(final Class<?> nodeClass, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux, String startMessage, String completionMessage) {
        final AsyncGenerator<? extends NodeOutput> startGenerator = StreamingChatGeneratorUtil.createStreamPrintGenerator(startMessage);
        final AsyncGenerator<? extends NodeOutput> mainGenerator = StreamingChatGeneratorUtil.createGenerator(nodeClass, state, mapResultFunction, flux);
        final AsyncGenerator<? extends NodeOutput> completionGenerator = StreamingChatGeneratorUtil.createStreamPrintGenerator(completionMessage);
        return new AsyncGenerator<NodeOutput>(){
            private int phase = 0;
            private Object finalResult = null;

            public AsyncGenerator.Data<NodeOutput> next() {
                switch (this.phase) {
                    case 0: {
                        AsyncGenerator.Data startData = startGenerator.next();
                        if (startData.isDone()) {
                            this.phase = 1;
                            logger.info("[{}] Start message completed, entering main processing phase", (Object)nodeClass.getSimpleName());
                        }
                        return startData;
                    }
                    case 1: {
                        AsyncGenerator.Data mainData = mainGenerator.next();
                        if (mainData.isDone()) {
                            this.finalResult = mainData.resultValue().orElse(null);
                            this.phase = 2;
                            logger.info("[{}] Main processing completed, entering completion message phase", (Object)nodeClass.getSimpleName());
                            return this.next();
                        }
                        return mainData;
                    }
                    case 2: {
                        AsyncGenerator.Data completionData = completionGenerator.next();
                        if (completionData.isDone()) {
                            this.phase = 3;
                            logger.info("[{}] Completion message output completed", (Object)nodeClass.getSimpleName());
                            return AsyncGenerator.Data.done((Object)this.finalResult);
                        }
                        return completionData;
                    }
                }
                return AsyncGenerator.Data.done((Object)this.finalResult);
            }
        };
    }

    public static AsyncGenerator<? extends NodeOutput> createGeneratorWithComposeCompletion(final Class<?> nodeClass, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux, String completionMessage) {
        final AsyncGenerator<? extends NodeOutput> mainGenerator = StreamingChatGeneratorUtil.createGenerator(nodeClass, state, mapResultFunction, flux);
        final AsyncGenerator<? extends NodeOutput> completionNotificationGenerator = StreamingChatGeneratorUtil.createStreamPrintGenerator(completionMessage);
        return new AsyncGenerator<NodeOutput>(){

            public AsyncGenerator.Data<NodeOutput> next() {
                AsyncGenerator.Data mainData = mainGenerator.next();
                if (mainData.isDone()) {
                    return AsyncGenerator.Data.composeWith((AsyncGenerator)completionNotificationGenerator, resultValue -> logger.info("[{}] Completion notification has been output", (Object)nodeClass.getSimpleName()));
                }
                return mainData;
            }
        };
    }

    public static StreamingProcessorBuilder createStreamingProcessor() {
        return new StreamingProcessorBuilder();
    }

    public static AsyncGenerator<? extends NodeOutput> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, String startMessage, String completionMessage, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).startMessage(startMessage).completionMessage(completionMessage).resultMapper(resultMapper).build(sourceFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, String startMessage, String completionMessage, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux, StreamResponseType type) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).startMessage(startMessage).type(type).completionMessage(completionMessage).resultMapper(resultMapper).build(sourceFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux, StreamResponseType type) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).type(type).resultMapper(resultMapper).build(sourceFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).resultMapper(resultMapper).build(sourceFlux);
    }

    public static AsyncGenerator<? extends NodeOutput> createMultiStepGenerator(Class<?> nodeClass, OverAllState state, List<ProcessingStep> processingSteps, Function<Map<String, Object>, Map<String, Object>> finalResultMapper) {
        Flux multiStepFlux = Flux.create(emitter -> {
            try {
                HashMap<String, Object> stepResults = new HashMap<String, Object>();
                for (ProcessingStep step : processingSteps) {
                    if (step.getMessage() != null) {
                        emitter.next((Object)ChatResponseUtil.createCustomStatusResponse(step.getMessage()));
                    }
                    Map<String, Object> stepResult = step.getExecutor().apply(state);
                    stepResults.putAll(stepResult);
                    if (step.getResultMessage() == null) continue;
                    String resultMsg = step.getResultMessage().apply(stepResult);
                    emitter.next((Object)ChatResponseUtil.createCustomStatusResponse(resultMsg));
                }
                emitter.complete();
            }
            catch (Exception e) {
                emitter.error((Throwable)e);
            }
        });
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).businessLogicExecutor(finalResultMapper != null ? currentState -> {
            HashMap<String, Object> allResults = new HashMap<String, Object>();
            for (ProcessingStep step : processingSteps) {
                allResults.putAll(step.getExecutor().apply((OverAllState)currentState));
            }
            return (Map)finalResultMapper.apply(allResults);
        } : currentState -> Map.of()).build((Flux<ChatResponse>)multiStepFlux);
    }

    public static class StreamingProcessorBuilder {
        private String startMessage;
        private String completionMessage;
        private StreamResponseType type = StreamResponseType.EXPLANATION;
        private Function<ChatResponse, String> contentExtractor = response -> response.getResult().getOutput().getText();
        private String nodeName;
        private OverAllState state;
        private Function<String, Map<String, Object>> resultMapper;
        private Function<OverAllState, Map<String, Object>> businessLogicExecutor;
        private boolean trimResult = true;

        private StreamingProcessorBuilder() {
        }

        public StreamingProcessorBuilder startMessage(String startMessage) {
            this.startMessage = startMessage;
            return this;
        }

        public StreamingProcessorBuilder completionMessage(String completionMessage) {
            this.completionMessage = completionMessage;
            return this;
        }

        public StreamingProcessorBuilder type(StreamResponseType type) {
            this.type = type;
            return this;
        }

        public StreamingProcessorBuilder contentExtractor(Function<ChatResponse, String> contentExtractor) {
            this.contentExtractor = contentExtractor;
            return this;
        }

        public StreamingProcessorBuilder nodeName(String nodeName) {
            this.nodeName = nodeName;
            return this;
        }

        public StreamingProcessorBuilder nodeClass(Class<?> nodeClass) {
            this.nodeName = nodeClass.getSimpleName();
            return this;
        }

        public StreamingProcessorBuilder state(OverAllState state) {
            this.state = state;
            return this;
        }

        public StreamingProcessorBuilder resultMapper(Function<String, Map<String, Object>> resultMapper) {
            this.resultMapper = resultMapper;
            return this;
        }

        public StreamingProcessorBuilder trimResult(boolean trimResult) {
            this.trimResult = trimResult;
            return this;
        }

        public StreamingProcessorBuilder businessLogicExecutor(Function<OverAllState, Map<String, Object>> businessLogicExecutor) {
            this.businessLogicExecutor = businessLogicExecutor;
            return this;
        }

        public AsyncGenerator<? extends NodeOutput> build(Flux<ChatResponse> sourceFlux) {
            if (this.nodeName == null || this.state == null) {
                throw new IllegalArgumentException("Node name and state cannot be null");
            }
            if (this.businessLogicExecutor != null) {
                return StreamingChatGeneratorUtil.createGenerator(this.nodeName, this.state, response -> this.businessLogicExecutor.apply(this.state), sourceFlux);
            }
            if (this.resultMapper == null) {
                throw new IllegalArgumentException("Result mapper cannot be null");
            }
            StringBuilder collectedResult = new StringBuilder();
            Flux wrappedFlux = Flux.create(emitter -> {
                try {
                    if (this.startMessage != null && !this.startMessage.isEmpty()) {
                        emitter.next((Object)ChatResponseUtil.createCustomStatusResponse(this.startMessage, this.type));
                    }
                    sourceFlux.doOnNext(response -> {
                        String content = this.contentExtractor.apply((ChatResponse)response);
                        if (content != null) {
                            collectedResult.append(content);
                        }
                        emitter.next((Object)ChatResponseUtil.createStatusResponse(response.getResult().getOutput().getText(), this.type));
                    }).doOnComplete(() -> {
                        if (this.completionMessage != null && !this.completionMessage.isEmpty()) {
                            emitter.next((Object)ChatResponseUtil.createCustomStatusResponse("\n" + this.completionMessage, this.type));
                        }
                        logger.debug("[{}] Streaming processing completed", (Object)this.nodeName);
                        emitter.complete();
                    }).doOnError(error -> {
                        logger.error("[{}] Error in streaming processing", (Object)this.nodeName, error);
                        emitter.error(error);
                    }).subscribe();
                }
                catch (Exception e) {
                    logger.error("[{}] Failed to start streaming processing", (Object)this.nodeName, (Object)e);
                    emitter.error((Throwable)e);
                }
            });
            return StreamingChatGeneratorUtil.createGenerator(this.nodeName, this.state, response -> {
                String finalResult = collectedResult.toString();
                if (this.trimResult) {
                    finalResult = finalResult.trim();
                }
                return this.resultMapper.apply(finalResult);
            }, (Flux<ChatResponse>)wrappedFlux);
        }
    }

    public static class ProcessingStep {
        private final String message;
        private final Function<OverAllState, Map<String, Object>> executor;
        private final Function<Map<String, Object>, String> resultMessage;

        public ProcessingStep(String message, Function<OverAllState, Map<String, Object>> executor, Function<Map<String, Object>, String> resultMessage) {
            this.message = message;
            this.executor = executor;
            this.resultMessage = resultMessage;
        }

        public ProcessingStep(String message, Function<OverAllState, Map<String, Object>> executor) {
            this(message, executor, null);
        }

        public String getMessage() {
            return this.message;
        }

        public Function<OverAllState, Map<String, Object>> getExecutor() {
            return this.executor;
        }

        public Function<Map<String, Object>, String> getResultMessage() {
            return this.resultMessage;
        }

        public static ProcessingStep of(String message, Function<OverAllState, Map<String, Object>> executor) {
            return new ProcessingStep(message, executor);
        }

        public static ProcessingStep of(String message, Function<OverAllState, Map<String, Object>> executor, Function<Map<String, Object>, String> resultMessage) {
            return new ProcessingStep(message, executor, resultMessage);
        }
    }
}

