/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.xds.util;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.xds.util.NodeBuilder;
import org.apache.dubbo.registry.xds.util.XdsChannel;
import org.apache.dubbo.registry.xds.util.protocol.impl.EdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.impl.LdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.impl.RdsProtocol;
import org.apache.dubbo.registry.xds.util.protocol.message.Endpoint;
import org.apache.dubbo.registry.xds.util.protocol.message.EndpointResult;
import org.apache.dubbo.registry.xds.util.protocol.message.ListenerResult;
import org.apache.dubbo.registry.xds.util.protocol.message.RouteResult;

public class PilotExchanger {
    private final XdsChannel xdsChannel;
    private final RdsProtocol rdsProtocol;
    private final EdsProtocol edsProtocol;
    private ListenerResult listenerResult;
    private RouteResult routeResult;
    private final AtomicLong observeRouteRequest = new AtomicLong(-1L);
    private final Map<String, Long> domainObserveRequest = new ConcurrentHashMap<String, Long>();
    private final Map<String, Set<Consumer<Set<Endpoint>>>> domainObserveConsumer = new ConcurrentHashMap<String, Set<Consumer<Set<Endpoint>>>>();

    private PilotExchanger(URL url) {
        this.xdsChannel = new XdsChannel(url);
        int pollingPoolSize = url.getParameter("pollingPoolSize", 10);
        int pollingTimeout = url.getParameter("pollingTimeout", 10);
        LdsProtocol ldsProtocol = new LdsProtocol(this.xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
        this.rdsProtocol = new RdsProtocol(this.xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
        this.edsProtocol = new EdsProtocol(this.xdsChannel, NodeBuilder.build(), pollingPoolSize, pollingTimeout);
        this.listenerResult = ldsProtocol.getListeners();
        this.routeResult = (RouteResult)this.rdsProtocol.getResource(this.listenerResult.getRouteConfigNames());
        if (CollectionUtils.isNotEmpty(this.listenerResult.getRouteConfigNames())) {
            this.observeRouteRequest.set(this.createRouteObserve());
        }
        ldsProtocol.observeListeners(newListener -> {
            if (!newListener.equals(this.listenerResult)) {
                this.listenerResult = newListener;
                AtomicLong atomicLong = this.observeRouteRequest;
                synchronized (atomicLong) {
                    if (this.observeRouteRequest.get() == -1L) {
                        this.observeRouteRequest.set(this.createRouteObserve());
                    } else {
                        this.rdsProtocol.updateObserve(this.observeRouteRequest.get(), newListener.getRouteConfigNames());
                    }
                }
            }
        });
    }

    private long createRouteObserve() {
        return this.rdsProtocol.observeResource(this.listenerResult.getRouteConfigNames(), newResult -> {
            this.domainObserveConsumer.forEach((domain, consumer) -> {
                Set<String> newRoute = newResult.searchDomain((String)domain);
                if (!this.routeResult.searchDomain((String)domain).equals(newRoute)) {
                    Long domainRequest = this.domainObserveRequest.get(domain);
                    if (domainRequest == null) {
                        this.doObserveEndpoints((String)domain);
                    } else {
                        this.edsProtocol.updateObserve(domainRequest, newRoute);
                    }
                }
            });
            this.routeResult = newResult;
        });
    }

    public static PilotExchanger initialize(URL url) {
        return new PilotExchanger(url);
    }

    public void destroy() {
        this.xdsChannel.destroy();
    }

    public Set<String> getServices() {
        return this.routeResult.getDomains();
    }

    public Set<Endpoint> getEndpoints(String domain) {
        Set<String> cluster = this.routeResult.searchDomain(domain);
        if (CollectionUtils.isNotEmpty(cluster)) {
            EndpointResult endpoint = (EndpointResult)this.edsProtocol.getResource(cluster);
            return endpoint.getEndpoints();
        }
        return Collections.emptySet();
    }

    public void observeEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
        this.domainObserveConsumer.compute(domain, (k, v) -> {
            if (v == null) {
                v = new ConcurrentHashSet<Consumer>();
            }
            v.add(consumer);
            return v;
        });
        if (!this.domainObserveRequest.containsKey(domain)) {
            this.doObserveEndpoints(domain);
        }
    }

    private void doObserveEndpoints(String domain) {
        Set<String> router = this.routeResult.searchDomain(domain);
        if (CollectionUtils.isNotEmpty(router)) {
            long endpointRequest = this.edsProtocol.observeResource(router, endpointResult -> this.domainObserveConsumer.get(domain).forEach(consumer1 -> consumer1.accept(endpointResult.getEndpoints())));
            this.domainObserveRequest.put(domain, endpointRequest);
        }
    }
}

