/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.cloud.ai.util;

import com.alibaba.cloud.ai.enums.StreamResponseType;
import com.alibaba.cloud.ai.graph.GraphResponse;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.alibaba.cloud.ai.util.ChatResponseUtil;
import java.util.Map;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Flux;

public class StreamingChatGeneratorUtil {
    private static final Logger logger = LoggerFactory.getLogger(StreamingChatGeneratorUtil.class);

    public static Flux<GraphResponse<StreamingOutput>> createGenerator(String nodeName, OverAllState state, Function<ChatResponse, Map<String, Object>> mapResultFunction, Flux<ChatResponse> flux) {
        return FluxConverter.builder().startingNode(nodeName).startingState(state).mapResult(mapResultFunction).build(flux);
    }

    public static StreamingProcessorBuilder createStreamingProcessor() {
        return new StreamingProcessorBuilder();
    }

    public static Flux<GraphResponse<StreamingOutput>> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, String startMessage, String completionMessage, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).startMessage(startMessage).completionMessage(completionMessage).resultMapper(resultMapper).build(sourceFlux);
    }

    public static Flux<GraphResponse<StreamingOutput>> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, String startMessage, String completionMessage, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux, StreamResponseType type) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).startMessage(startMessage).type(type).completionMessage(completionMessage).resultMapper(resultMapper).build(sourceFlux);
    }

    public static Flux<GraphResponse<StreamingOutput>> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux, StreamResponseType type) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).type(type).resultMapper(resultMapper).build(sourceFlux);
    }

    public static Flux<GraphResponse<StreamingOutput>> createStreamingGeneratorWithMessages(Class<?> nodeClass, OverAllState state, Function<String, Map<String, Object>> resultMapper, Flux<ChatResponse> sourceFlux) {
        return StreamingChatGeneratorUtil.createStreamingProcessor().nodeClass(nodeClass).state(state).resultMapper(resultMapper).build(sourceFlux);
    }

    public static class StreamingProcessorBuilder {
        private String startMessage;
        private String completionMessage;
        private StreamResponseType type = StreamResponseType.EXPLANATION;
        private Function<ChatResponse, String> contentExtractor = response -> response.getResult().getOutput().getText();
        private String nodeName;
        private OverAllState state;
        private Function<String, Map<String, Object>> resultMapper;
        private Function<OverAllState, Map<String, Object>> businessLogicExecutor;
        private boolean trimResult = true;

        private StreamingProcessorBuilder() {
        }

        public StreamingProcessorBuilder startMessage(String startMessage) {
            this.startMessage = startMessage;
            return this;
        }

        public StreamingProcessorBuilder completionMessage(String completionMessage) {
            this.completionMessage = completionMessage;
            return this;
        }

        public StreamingProcessorBuilder type(StreamResponseType type) {
            this.type = type;
            return this;
        }

        public StreamingProcessorBuilder contentExtractor(Function<ChatResponse, String> contentExtractor) {
            this.contentExtractor = contentExtractor;
            return this;
        }

        public StreamingProcessorBuilder nodeName(String nodeName) {
            this.nodeName = nodeName;
            return this;
        }

        public StreamingProcessorBuilder nodeClass(Class<?> nodeClass) {
            this.nodeName = nodeClass.getSimpleName();
            return this;
        }

        public StreamingProcessorBuilder state(OverAllState state) {
            this.state = state;
            return this;
        }

        public StreamingProcessorBuilder resultMapper(Function<String, Map<String, Object>> resultMapper) {
            this.resultMapper = resultMapper;
            return this;
        }

        public StreamingProcessorBuilder trimResult(boolean trimResult) {
            this.trimResult = trimResult;
            return this;
        }

        public StreamingProcessorBuilder businessLogicExecutor(Function<OverAllState, Map<String, Object>> businessLogicExecutor) {
            this.businessLogicExecutor = businessLogicExecutor;
            return this;
        }

        public Flux<GraphResponse<StreamingOutput>> build(Flux<ChatResponse> sourceFlux) {
            if (this.nodeName == null || this.state == null) {
                throw new IllegalArgumentException("Node name and state cannot be null");
            }
            if (this.businessLogicExecutor != null) {
                return StreamingChatGeneratorUtil.createGenerator(this.nodeName, this.state, response -> this.businessLogicExecutor.apply(this.state), sourceFlux);
            }
            if (this.resultMapper == null) {
                throw new IllegalArgumentException("Result mapper cannot be null");
            }
            StringBuilder collectedResult = new StringBuilder();
            Flux wrappedFlux = Flux.create(emitter -> {
                try {
                    if (this.startMessage != null && !this.startMessage.isEmpty()) {
                        emitter.next((Object)ChatResponseUtil.createCustomStatusResponse(this.startMessage, this.type));
                    }
                    sourceFlux.doOnNext(response -> {
                        String content = this.contentExtractor.apply((ChatResponse)response);
                        if (content != null) {
                            collectedResult.append(content);
                        }
                        emitter.next((Object)ChatResponseUtil.createStatusResponse(response.getResult().getOutput().getText(), this.type));
                    }).doOnComplete(() -> {
                        if (this.completionMessage != null && !this.completionMessage.isEmpty()) {
                            emitter.next((Object)ChatResponseUtil.createCustomStatusResponse("\n" + this.completionMessage, this.type));
                        }
                        logger.debug("[{}] Streaming processing completed", (Object)this.nodeName);
                        emitter.complete();
                    }).doOnError(error -> {
                        logger.error("[{}] Error in streaming processing", (Object)this.nodeName, error);
                        emitter.error(error);
                    }).subscribe();
                }
                catch (Exception e) {
                    logger.error("[{}] Failed to start streaming processing", (Object)this.nodeName, (Object)e);
                    emitter.error((Throwable)e);
                }
            });
            return StreamingChatGeneratorUtil.createGenerator(this.nodeName, this.state, response -> {
                String finalResult = collectedResult.toString();
                if (this.trimResult) {
                    finalResult = finalResult.trim();
                }
                return this.resultMapper.apply(finalResult);
            }, (Flux<ChatResponse>)wrappedFlux);
        }
    }
}

