/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.xds.util;

import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.registry.xds.util.XdsChannel;
import org.apache.dubbo.registry.xds.util.XdsListener;
import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
import org.apache.dubbo.rpc.model.ApplicationModel;

public class AdsObserver {
    private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AdsObserver.class);
    private final ApplicationModel applicationModel;
    private final URL url;
    private final Node node;
    private volatile XdsChannel xdsChannel;
    private final Map<String, XdsListener> listeners = new ConcurrentHashMap<String, XdsListener>();
    protected StreamObserver<DiscoveryRequest> requestObserver;
    private final Map<String, DiscoveryRequest> observedResources = new ConcurrentHashMap<String, DiscoveryRequest>();

    public AdsObserver(URL url, Node node) {
        this.url = url;
        this.node = node;
        this.xdsChannel = new XdsChannel(url);
        this.applicationModel = url.getOrDefaultApplicationModel();
    }

    public <T, S extends DeltaResource<T>> void addListener(AbstractProtocol<T, S> protocol) {
        this.listeners.put(protocol.getTypeUrl(), protocol);
    }

    public void request(DiscoveryRequest discoveryRequest) {
        if (this.requestObserver == null) {
            this.requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
        }
        this.requestObserver.onNext((Object)discoveryRequest);
        this.observedResources.put(discoveryRequest.getTypeUrl(), discoveryRequest);
    }

    private void triggerReConnectTask() {
        ScheduledExecutorService scheduledFuture = this.applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class).getSharedScheduledExecutor();
        scheduledFuture.schedule(this::recover, 3L, TimeUnit.SECONDS);
    }

    private void recover() {
        try {
            this.xdsChannel = new XdsChannel(this.url);
            if (this.xdsChannel.getChannel() != null) {
                this.requestObserver = this.xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
                this.observedResources.values().forEach(arg_0 -> this.requestObserver.onNext(arg_0));
                return;
            }
            logger.error("1-30", "", "", "Recover failed for xDS connection. Will retry. Create channel failed.");
        }
        catch (Exception e) {
            logger.error("1-30", "", "", "Recover failed for xDS connection. Will retry.", e);
        }
        this.triggerReConnectTask();
    }

    private static class ResponseObserver
    implements StreamObserver<DiscoveryResponse> {
        private AdsObserver adsObserver;

        public ResponseObserver(AdsObserver adsObserver) {
            this.adsObserver = adsObserver;
        }

        public void onNext(DiscoveryResponse discoveryResponse) {
            XdsListener xdsListener = (XdsListener)this.adsObserver.listeners.get(discoveryResponse.getTypeUrl());
            xdsListener.process(discoveryResponse);
            this.adsObserver.requestObserver.onNext((Object)this.buildAck(discoveryResponse));
        }

        protected DiscoveryRequest buildAck(DiscoveryResponse response) {
            return DiscoveryRequest.newBuilder().setNode(this.adsObserver.node).setTypeUrl(response.getTypeUrl()).setVersionInfo(response.getVersionInfo()).setResponseNonce(response.getNonce()).addAllResourceNames((Iterable)((DiscoveryRequest)this.adsObserver.observedResources.get(response.getTypeUrl())).getResourceNamesList()).build();
        }

        public void onError(Throwable throwable) {
            logger.error("1-30", "", "", "xDS Client received error message! detail:", throwable);
            this.adsObserver.triggerReConnectTask();
        }

        public void onCompleted() {
            logger.info("xDS Client completed");
            this.adsObserver.triggerReConnectTask();
        }
    }
}

