/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dubbo.registry.kubernetes;

import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
import io.fabric8.kubernetes.api.model.Endpoints;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodFluent;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import java.lang.reflect.Type;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.kubernetes.KubernetesMeshEnvListener;
import org.apache.dubbo.registry.kubernetes.util.KubernetesConfigUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ScopeModelUtil;

public class KubernetesServiceDiscovery
extends AbstractServiceDiscovery {
    private final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(this.getClass());
    private KubernetesClient kubernetesClient;
    private String currentHostname;
    private final URL registryURL;
    private final String namespace;
    private final boolean enableRegister;
    public static final String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
    private static final ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap(64);
    private static final ConcurrentHashMap<String, SharedIndexInformer<Service>> SERVICE_INFORMER = new ConcurrentHashMap(64);
    private static final ConcurrentHashMap<String, SharedIndexInformer<Pod>> PODS_INFORMER = new ConcurrentHashMap(64);
    private static final ConcurrentHashMap<String, SharedIndexInformer<Endpoints>> ENDPOINTS_INFORMER = new ConcurrentHashMap(64);

    public KubernetesServiceDiscovery(ApplicationModel applicationModel, URL registryURL) {
        super(applicationModel, registryURL);
        boolean availableAccess;
        Config config = KubernetesConfigUtils.createKubernetesConfig(registryURL);
        this.kubernetesClient = new KubernetesClientBuilder().withConfig(config).build();
        this.currentHostname = System.getenv("HOSTNAME");
        this.registryURL = registryURL;
        this.namespace = config.getNamespace();
        this.enableRegister = registryURL.getParameter("enableRegister", true);
        try {
            availableAccess = ((PodResource)this.kubernetesClient.pods().withName(this.currentHostname)).get() != null;
        }
        catch (Throwable e) {
            availableAccess = false;
        }
        if (!availableAccess) {
            String message = "Unable to access api server. Please check your url config. Master URL: " + config.getMasterUrl() + " Hostname: " + this.currentHostname;
            this.logger.error("1-22", "", "", message);
        } else {
            KubernetesMeshEnvListener.injectKubernetesEnv(this.kubernetesClient, this.namespace);
        }
    }

    @Override
    public void doDestroy() {
        SERVICE_INFORMER.forEach((k, v) -> v.close());
        SERVICE_INFORMER.clear();
        PODS_INFORMER.forEach((k, v) -> v.close());
        PODS_INFORMER.clear();
        ENDPOINTS_INFORMER.forEach((k, v) -> v.close());
        ENDPOINTS_INFORMER.clear();
        this.kubernetesClient.close();
    }

    @Override
    public void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
        if (this.enableRegister) {
            ((PodResource)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(this.namespace)).withName(this.currentHostname)).edit(pod -> ((PodBuilder)((PodFluent.MetadataNested)new PodBuilder(pod).editOrNewMetadata().addToAnnotations(KUBERNETES_PROPERTIES_KEY, JsonUtils.getJson().toJson(serviceInstance.getMetadata()))).endMetadata()).build());
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Write Current Service Instance Metadata to Kubernetes pod. Current pod name: " + this.currentHostname);
            }
        }
    }

    @Override
    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
        this.reportMetadata(serviceInstance.getServiceMetadata());
        this.doRegister(serviceInstance);
    }

    @Override
    public void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
        if (this.enableRegister) {
            ((PodResource)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(this.namespace)).withName(this.currentHostname)).edit(pod -> ((PodBuilder)((PodFluent.MetadataNested)new PodBuilder(pod).editOrNewMetadata().removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)).endMetadata()).build());
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + this.currentHostname);
            }
        }
    }

    @Override
    public Set<String> getServices() {
        return ((ServiceList)((NonNamespaceOperation)this.kubernetesClient.services().inNamespace(this.namespace)).list()).getItems().stream().map(service -> service.getMetadata().getName()).collect(Collectors.toSet());
    }

    @Override
    public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
        List endpointsList;
        Endpoints endpoints = null;
        SharedIndexInformer<Endpoints> endInformer = ENDPOINTS_INFORMER.get(serviceName);
        if (endInformer != null && (endpointsList = endInformer.getStore().list()).size() > 0) {
            endpoints = (Endpoints)endpointsList.get(0);
        }
        if (endpoints == null) {
            endpoints = (Endpoints)((Resource)((NonNamespaceOperation)this.kubernetesClient.endpoints().inNamespace(this.namespace)).withName(serviceName)).get();
        }
        return this.toServiceInstance(endpoints, serviceName);
    }

    @Override
    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
        listener.getServiceNames().forEach(serviceName -> {
            SERVICE_UPDATE_TIME.put((String)serviceName, new AtomicLong(0L));
            this.watchEndpoints(listener, (String)serviceName);
            this.watchPods(listener, (String)serviceName);
            this.watchService(listener, (String)serviceName);
        });
    }

    private void watchEndpoints(final ServiceInstancesChangedListener listener, final String serviceName) {
        SharedIndexInformer endInformer = ((Resource)((NonNamespaceOperation)this.kubernetesClient.endpoints().inNamespace(this.namespace)).withName(serviceName)).inform((ResourceEventHandler)new ResourceEventHandler<Endpoints>(){

            public void onAdd(Endpoints endpoints) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Endpoint Event. Event type: added. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". Endpoints is: " + endpoints);
                }
                KubernetesServiceDiscovery.this.notifyServiceChanged(serviceName, listener, KubernetesServiceDiscovery.this.toServiceInstance(endpoints, serviceName));
            }

            public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Endpoint Event. Event type: updated. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". The new Endpoints is: " + newEndpoints);
                }
                KubernetesServiceDiscovery.this.notifyServiceChanged(serviceName, listener, KubernetesServiceDiscovery.this.toServiceInstance(newEndpoints, serviceName));
            }

            public void onDelete(Endpoints endpoints, boolean deletedFinalStateUnknown) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Endpoint Event. Event type: deleted. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". Endpoints is: " + endpoints);
                }
                KubernetesServiceDiscovery.this.notifyServiceChanged(serviceName, listener, KubernetesServiceDiscovery.this.toServiceInstance(endpoints, serviceName));
            }
        });
        ENDPOINTS_INFORMER.put(serviceName, (SharedIndexInformer<Endpoints>)endInformer);
    }

    private void watchPods(final ServiceInstancesChangedListener listener, final String serviceName) {
        Map<String, String> serviceSelector = this.getServiceSelector(serviceName);
        if (serviceSelector == null) {
            return;
        }
        SharedIndexInformer podInformer = ((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(this.namespace)).withLabels(serviceSelector)).inform((ResourceEventHandler)new ResourceEventHandler<Pod>(){

            public void onAdd(Pod pod) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Pods Event. Event type: added. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". Pod is: " + pod);
                }
            }

            public void onUpdate(Pod oldPod, Pod newPod) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Pods Event. Event type: updated. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". new Pod is: " + newPod);
                }
                KubernetesServiceDiscovery.this.notifyServiceChanged(serviceName, listener, KubernetesServiceDiscovery.this.getInstances(serviceName));
            }

            public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Pods Event. Event type: deleted. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". Pod is: " + pod);
                }
            }
        });
        PODS_INFORMER.put(serviceName, (SharedIndexInformer<Pod>)podInformer);
    }

    private void watchService(final ServiceInstancesChangedListener listener, final String serviceName) {
        SharedIndexInformer serviceInformer = ((ServiceResource)((NonNamespaceOperation)this.kubernetesClient.services().inNamespace(this.namespace)).withName(serviceName)).inform((ResourceEventHandler)new ResourceEventHandler<Service>(){

            public void onAdd(Service service) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Service Added Event. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname);
                }
            }

            public void onUpdate(Service oldService, Service newService) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Service Update Event. Update Pods Watcher. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname + ". The new Service is: " + newService);
                }
                if (PODS_INFORMER.containsKey(serviceName)) {
                    ((SharedIndexInformer)PODS_INFORMER.get(serviceName)).close();
                    PODS_INFORMER.remove(serviceName);
                }
                KubernetesServiceDiscovery.this.watchPods(listener, serviceName);
            }

            public void onDelete(Service service, boolean deletedFinalStateUnknown) {
                if (KubernetesServiceDiscovery.this.logger.isDebugEnabled()) {
                    KubernetesServiceDiscovery.this.logger.debug("Received Service Delete Event. Current pod name: " + KubernetesServiceDiscovery.this.currentHostname);
                }
            }
        });
        SERVICE_INFORMER.put(serviceName, (SharedIndexInformer<Service>)serviceInformer);
    }

    private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener, List<ServiceInstance> serviceInstanceList) {
        long receivedTime = System.nanoTime();
        ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName, serviceInstanceList);
        AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
        long lastUpdateTime = updateTime.get();
        if (lastUpdateTime <= receivedTime && updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
            listener.onEvent(event);
            return;
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Discard Service Instance Data. Possible Cause: Newer message has been processed or Failed to update time record by CAS. Current Data received time: " + receivedTime + ". Newer Data received time: " + lastUpdateTime + ".");
        }
    }

    @Override
    public URL getUrl() {
        return this.registryURL;
    }

    private Map<String, String> getServiceSelector(String serviceName) {
        Service service = (Service)((ServiceResource)((NonNamespaceOperation)this.kubernetesClient.services().inNamespace(this.namespace)).withName(serviceName)).get();
        if (service == null) {
            return null;
        }
        return service.getSpec().getSelector();
    }

    private List<ServiceInstance> toServiceInstance(Endpoints endpoints, String serviceName) {
        Map<String, String> serviceSelector = this.getServiceSelector(serviceName);
        if (serviceSelector == null) {
            return new LinkedList<ServiceInstance>();
        }
        Map<String, Pod> pods = ((PodList)((FilterWatchListDeletable)((NonNamespaceOperation)this.kubernetesClient.pods().inNamespace(this.namespace)).withLabels(serviceSelector)).list()).getItems().stream().collect(Collectors.toMap(pod -> pod.getMetadata().getName(), pod -> pod));
        LinkedList<ServiceInstance> instances = new LinkedList<ServiceInstance>();
        HashSet instancePorts = new HashSet();
        for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
            instancePorts.addAll(endpointSubset.getPorts().stream().map(EndpointPort::getPort).collect(Collectors.toSet()));
        }
        for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
            for (EndpointAddress address : endpointSubset.getAddresses()) {
                Pod pod2 = pods.get(address.getTargetRef().getName());
                String ip = address.getIp();
                if (pod2 == null) {
                    this.logger.warn("1-20", "", "", "Unable to match Kubernetes Endpoint address with Pod. EndpointAddress Hostname: " + address.getTargetRef().getName());
                    continue;
                }
                instancePorts.forEach(port -> {
                    DefaultServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, ip, (Integer)port, ScopeModelUtil.getApplicationModel(this.getUrl().getScopeModel()));
                    String properties = (String)pod2.getMetadata().getAnnotations().get(KUBERNETES_PROPERTIES_KEY);
                    if (StringUtils.isNotEmpty(properties)) {
                        serviceInstance.getMetadata().putAll((Map)JsonUtils.getJson().toJavaObject(properties, (Type)((Object)Map.class)));
                        instances.add(serviceInstance);
                    } else {
                        this.logger.warn("1-21", "", "", "Unable to find Service Instance metadata in Pod Annotations. Possibly cause: provider has not been initialized successfully. EndpointAddress Hostname: " + address.getTargetRef().getName());
                    }
                });
            }
        }
        return instances;
    }

    @Deprecated
    public void setCurrentHostname(String currentHostname) {
        this.currentHostname = currentHostname;
    }

    @Deprecated
    public void setKubernetesClient(KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
    }
}

