/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.zookeeper;

import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.function.ThrowableConsumer;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.zookeeper.ZookeeperInstance;
import org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscoveryChangeWatcher;
import org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;

public class ZookeeperServiceDiscovery
extends AbstractServiceDiscovery {
    private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(this.getClass());
    public static final String DEFAULT_GROUP = "/services";
    private final CuratorFramework curatorFramework;
    private final String rootPath;
    private final ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
    private final Map<String, ZookeeperServiceDiscoveryChangeWatcher> watcherCaches = new ConcurrentHashMap<String, ZookeeperServiceDiscoveryChangeWatcher>();

    public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
        super(applicationModel, registryURL);
        try {
            this.curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(registryURL, this);
            this.rootPath = CuratorFrameworkUtils.getRootPath(registryURL);
            this.serviceDiscovery = CuratorFrameworkUtils.buildServiceDiscovery(this.curatorFramework, this.rootPath);
            this.serviceDiscovery.start();
        }
        catch (Exception e) {
            throw new IllegalStateException("Create zookeeper service discovery failed.", e);
        }
    }

    @Override
    public void doDestroy() throws Exception {
        this.serviceDiscovery.close();
        this.curatorFramework.close();
        this.watcherCaches.clear();
    }

    @Override
    public void doRegister(ServiceInstance serviceInstance) {
        try {
            this.serviceDiscovery.registerService(CuratorFrameworkUtils.build(serviceInstance));
        }
        catch (Exception e) {
            throw new RpcException(9, "Failed register instance " + serviceInstance.toString(), e);
        }
    }

    @Override
    public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
        if (serviceInstance != null) {
            this.doInServiceRegistry(serviceDiscovery -> serviceDiscovery.unregisterService(CuratorFrameworkUtils.build(serviceInstance)));
        }
    }

    @Override
    protected void doUpdate(ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) throws RuntimeException {
        if ("0".equals(ServiceInstanceMetadataUtils.getExportedServicesRevision(newServiceInstance)) || "0".equals(oldServiceInstance.getMetadata().get("dubbo.metadata.revision"))) {
            super.doUpdate(oldServiceInstance, newServiceInstance);
            return;
        }
        org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> oldInstance = CuratorFrameworkUtils.build(oldServiceInstance);
        org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> newInstance = CuratorFrameworkUtils.build(newServiceInstance);
        if (!Objects.equals(newInstance.getName(), oldInstance.getName()) || !Objects.equals(newInstance.getId(), oldInstance.getId())) {
            super.doUpdate(oldServiceInstance, newServiceInstance);
            return;
        }
        try {
            this.serviceInstance = newServiceInstance;
            this.reportMetadata(newServiceInstance.getServiceMetadata());
            this.serviceDiscovery.updateService(newInstance);
        }
        catch (Exception e) {
            throw new RpcException(9, "Failed register instance " + newServiceInstance.toString(), e);
        }
    }

    @Override
    public Set<String> getServices() {
        return this.doInServiceDiscovery(s -> new LinkedHashSet(s.queryForNames()));
    }

    @Override
    public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
        return this.doInServiceDiscovery(s -> CuratorFrameworkUtils.build(this.registryURL, s.queryForInstances(serviceName)));
    }

    @Override
    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
        if (!this.instanceListeners.add(listener)) {
            return;
        }
        listener.getServiceNames().forEach(serviceName -> this.registerServiceWatcher((String)serviceName, listener));
    }

    @Override
    public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException {
        if (!this.instanceListeners.remove(listener)) {
            return;
        }
        listener.getServiceNames().forEach(serviceName -> {
            ZookeeperServiceDiscoveryChangeWatcher watcher = this.watcherCaches.get(serviceName);
            if (watcher != null) {
                watcher.getListeners().remove(listener);
                if (watcher.getListeners().isEmpty()) {
                    this.watcherCaches.remove(serviceName);
                    try {
                        watcher.getCacheInstance().close();
                    }
                    catch (IOException e) {
                        this.logger.error("1-35", "curator stop watch failed", "", "Curator Stop service discovery watch failed. Service Name: " + serviceName);
                    }
                }
            }
        });
    }

    private void doInServiceRegistry(ThrowableConsumer<ServiceDiscovery> consumer) {
        ThrowableConsumer.execute(this.serviceDiscovery, s -> consumer.accept((ServiceDiscovery)s));
    }

    private <R> R doInServiceDiscovery(ThrowableFunction<ServiceDiscovery, R> function) {
        return ThrowableFunction.execute(this.serviceDiscovery, function);
    }

    protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
        CountDownLatch latch = new CountDownLatch(1);
        ZookeeperServiceDiscoveryChangeWatcher watcher = this.watcherCaches.computeIfAbsent(serviceName, name -> {
            ServiceCache serviceCache = this.serviceDiscovery.serviceCacheBuilder().name(name).build();
            ZookeeperServiceDiscoveryChangeWatcher newer = new ZookeeperServiceDiscoveryChangeWatcher(this, (ServiceCache<ZookeeperInstance>)serviceCache, (String)name, latch);
            serviceCache.addListener((Object)newer);
            try {
                serviceCache.start();
            }
            catch (Exception e) {
                throw new RpcException(9, "Failed subscribe service: " + name, e);
            }
            return newer;
        });
        watcher.addListener(listener);
        listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName)));
        latch.countDown();
    }
}

