package com.google.adk.models;

import com.google.adk.models.LlmResponse;
import com.google.common.collect.ImmutableList;
import com.google.genai.AsyncSession;
import com.google.genai.Client;
import com.google.genai.types.Blob;
import com.google.genai.types.Content;
import com.google.genai.types.FinishReason;
import com.google.genai.types.FunctionCall;
import com.google.genai.types.FunctionResponse;
import com.google.genai.types.LiveConnectConfig;
import com.google.genai.types.LiveSendClientContentParameters;
import com.google.genai.types.LiveSendRealtimeInputParameters;
import com.google.genai.types.LiveSendToolResponseParameters;
import com.google.genai.types.LiveServerContent;
import com.google.genai.types.LiveServerMessage;
import com.google.genai.types.LiveServerToolCall;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.PublishProcessor;
import java.net.SocketException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/adk/models/GeminiLlmConnection.class */
public final class GeminiLlmConnection implements BaseLlmConnection {
    private static final Logger logger = LoggerFactory.getLogger(GeminiLlmConnection.class);
    private final Client apiClient;
    private final String modelName;
    private final LiveConnectConfig connectConfig;
    private final CompletableFuture<AsyncSession> sessionFuture;
    private final PublishProcessor<LlmResponse> responseProcessor = PublishProcessor.create();
    private final Flowable<LlmResponse> responseFlowable = this.responseProcessor.serialize();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public GeminiLlmConnection(Client client, String str, LiveConnectConfig liveConnectConfig) {
        this.apiClient = (Client) Objects.requireNonNull(client);
        this.modelName = (String) Objects.requireNonNull(str);
        this.connectConfig = (LiveConnectConfig) Objects.requireNonNull(liveConnectConfig);
        this.sessionFuture = this.apiClient.async.live.connect(this.modelName, this.connectConfig).whenCompleteAsync((asyncSession, th) -> {
            if (th != null) {
                handleConnectionError(th);
            } else if (asyncSession != null) {
                setupReceiver(asyncSession);
            } else {
                if (this.closed.get()) {
                    return;
                }
                handleConnectionError(new SocketException("WebSocket connection failed without explicit error."));
            }
        });
    }

    private void setupReceiver(AsyncSession asyncSession) {
        if (this.closed.get()) {
            closeSessionIgnoringErrors(asyncSession);
        } else {
            asyncSession.receive(this::handleServerMessage).exceptionally(th -> {
                handleReceiveError(th);
                return null;
            });
        }
    }

    private void handleServerMessage(LiveServerMessage liveServerMessage) {
        if (this.closed.get()) {
            return;
        }
        logger.debug("Received server message: {}", liveServerMessage.toJson());
        Optional<LlmResponse> convertToServerResponse = convertToServerResponse(liveServerMessage);
        PublishProcessor<LlmResponse> publishProcessor = this.responseProcessor;
        Objects.requireNonNull(publishProcessor);
        convertToServerResponse.ifPresent((v1) -> {
            r1.onNext(v1);
        });
    }

    private Optional<LlmResponse> convertToServerResponse(LiveServerMessage liveServerMessage) {
        LlmResponse.Builder builder = LlmResponse.builder();
        if (liveServerMessage.serverContent().isPresent()) {
            LiveServerContent liveServerContent = (LiveServerContent) liveServerMessage.serverContent().get();
            Optional modelTurn = liveServerContent.modelTurn();
            Objects.requireNonNull(builder);
            modelTurn.ifPresent(builder::content);
            builder.partial((Boolean) liveServerContent.turnComplete().map(bool -> {
                return Boolean.valueOf(!bool.booleanValue());
            }).orElse(false)).turnComplete((Boolean) liveServerContent.turnComplete().orElse(false));
        } else if (liveServerMessage.toolCall().isPresent()) {
            ((LiveServerToolCall) liveServerMessage.toolCall().get()).functionCalls().ifPresent(list -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    builder.content(Content.builder().parts(ImmutableList.of(Part.builder().functionCall((FunctionCall) it.next()).build())).build());
                }
            });
            builder.partial((Boolean) false).turnComplete((Boolean) false);
        } else {
            if (liveServerMessage.usageMetadata().isPresent()) {
                logger.debug("Received usage metadata: {}", liveServerMessage.usageMetadata().get());
                return Optional.empty();
            }
            if (liveServerMessage.toolCallCancellation().isPresent()) {
                logger.debug("Received tool call cancellation: {}", liveServerMessage.toolCallCancellation().get());
                return Optional.empty();
            }
            if (liveServerMessage.setupComplete().isPresent()) {
                logger.debug("Received setup complete.");
                return Optional.empty();
            }
            logger.warn("Received unknown or empty server message: {}", liveServerMessage.toJson());
            builder.errorCode(new FinishReason("Unknown server message.")).errorMessage("Received unknown server message.");
        }
        return Optional.of(builder.build());
    }

    private void handleConnectionError(Throwable th) {
        if (this.closed.compareAndSet(false, true)) {
            logger.error("WebSocket connection failed", th);
            this.responseProcessor.onError(th instanceof CompletionException ? th.getCause() : th);
        }
    }

    private void handleReceiveError(Throwable th) {
        if (this.closed.compareAndSet(false, true)) {
            logger.error("Error during WebSocket receive operation", th);
            this.responseProcessor.onError(th);
            this.sessionFuture.thenAccept(this::closeSessionIgnoringErrors).exceptionally(th2 -> {
                return null;
            });
        }
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public Completable sendHistory(List<Content> list) {
        return sendClientContentInternal(LiveSendClientContentParameters.builder().turns(list).build());
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public Completable sendContent(Content content) {
        Objects.requireNonNull(content, "content cannot be null");
        Optional<List<FunctionResponse>> extractFunctionResponses = extractFunctionResponses(content);
        return extractFunctionResponses.isPresent() ? sendToolResponseInternal(LiveSendToolResponseParameters.builder().functionResponses(extractFunctionResponses.get()).build()) : sendClientContentInternal(LiveSendClientContentParameters.builder().turns(ImmutableList.of(content)).build());
    }

    private Optional<List<FunctionResponse>> extractFunctionResponses(Content content) {
        if (content.parts().isEmpty() || ((List) content.parts().get()).isEmpty()) {
            return Optional.empty();
        }
        ImmutableList immutableList = (ImmutableList) ((List) content.parts().get()).stream().map((v0) -> {
            return v0.functionResponse();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(ImmutableList.toImmutableList());
        return immutableList.size() == ((List) content.parts().get()).size() ? Optional.of(immutableList) : Optional.empty();
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public Completable sendRealtime(Blob blob) {
        return Completable.fromFuture(this.sessionFuture.thenCompose(asyncSession -> {
            return asyncSession.sendRealtimeInput(LiveSendRealtimeInputParameters.builder().media(blob).build());
        }));
    }

    private Completable sendClientContentInternal(LiveSendClientContentParameters liveSendClientContentParameters) {
        return Completable.fromFuture(this.sessionFuture.thenCompose(asyncSession -> {
            return asyncSession.sendClientContent(liveSendClientContentParameters);
        }));
    }

    private Completable sendToolResponseInternal(LiveSendToolResponseParameters liveSendToolResponseParameters) {
        return Completable.fromFuture(this.sessionFuture.thenCompose(asyncSession -> {
            return asyncSession.sendToolResponse(liveSendToolResponseParameters);
        }));
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public Flowable<LlmResponse> receive() {
        return this.responseFlowable;
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public void close() {
        closeInternal(null);
    }

    @Override // com.google.adk.models.BaseLlmConnection
    public void close(Throwable th) {
        Objects.requireNonNull(th, "throwable cannot be null for close");
        closeInternal(th);
    }

    private void closeInternal(Throwable th) {
        if (this.closed.compareAndSet(false, true)) {
            logger.debug("Closing GeminiConnection.", th);
            if (th == null) {
                this.responseProcessor.onComplete();
            } else {
                this.responseProcessor.onError(th);
            }
            if (this.sessionFuture.isDone()) {
                this.sessionFuture.thenAccept(this::closeSessionIgnoringErrors).exceptionally(th2 -> {
                    return null;
                });
            } else {
                this.sessionFuture.cancel(false);
            }
        }
    }

    private void closeSessionIgnoringErrors(AsyncSession asyncSession) {
        if (asyncSession != null) {
            asyncSession.close().exceptionally(th -> {
                logger.warn("Error occurred while closing AsyncSession", th);
                return null;
            });
        }
    }
}
