package com.azure.messaging.eventhubs;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* loaded from: input_file:com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.class */
final class PartitionBasedLoadBalancer {
    private static final Random RANDOM = new Random();
    private final String eventHubName;
    private final String consumerGroupName;
    private final CheckpointStore checkpointStore;
    private final EventHubAsyncClient eventHubAsyncClient;
    private final String ownerId;
    private final long inactiveTimeLimitInSeconds;
    private final PartitionPumpManager partitionPumpManager;
    private final String fullyQualifiedNamespace;
    private final Consumer<ErrorContext> processError;
    private final PartitionContext partitionAgnosticContext;
    private final ClientLogger logger = new ClientLogger(PartitionBasedLoadBalancer.class);
    private final AtomicBoolean isLoadBalancerRunning = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionBasedLoadBalancer(CheckpointStore checkpointStore, EventHubAsyncClient eventHubAsyncClient, String str, String str2, String str3, String str4, long j, PartitionPumpManager partitionPumpManager, Consumer<ErrorContext> consumer) {
        this.checkpointStore = checkpointStore;
        this.eventHubAsyncClient = eventHubAsyncClient;
        this.fullyQualifiedNamespace = str;
        this.eventHubName = str2;
        this.consumerGroupName = str3;
        this.ownerId = str4;
        this.inactiveTimeLimitInSeconds = j;
        this.partitionPumpManager = partitionPumpManager;
        this.processError = consumer;
        this.partitionAgnosticContext = new PartitionContext(str, str2, str3, "NONE");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void loadBalance() {
        if (!this.isLoadBalancerRunning.compareAndSet(false, true)) {
            this.logger.info("Load balancer already running");
        } else {
            this.logger.info("Starting load balancer for {}", new Object[]{this.ownerId});
            Mono.zip(this.checkpointStore.listOwnership(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).timeout(Duration.ofMinutes(1L)).collectMap((v0) -> {
                return v0.getPartitionId();
            }, Function.identity()), this.eventHubAsyncClient.getPartitionIds().timeout(Duration.ofMinutes(1L)).onErrorResume(TimeoutException.class, timeoutException -> {
                this.logger.warning("Unable to get partitionIds from eventHubAsyncClient.");
                return Flux.empty();
            }).collectList()).flatMap(this::loadBalance).subscribe(r1 -> {
            }, th -> {
                this.logger.warning(Messages.LOAD_BALANCING_FAILED, new Object[]{th.getMessage(), th});
                this.processError.accept(new ErrorContext(this.partitionAgnosticContext, th));
                this.isLoadBalancerRunning.set(false);
            }, () -> {
                this.logger.info("Load balancing completed successfully");
            });
        }
    }

    private Mono<Void> loadBalance(Tuple2<Map<String, PartitionOwnership>, List<String>> tuple2) {
        return Mono.fromRunnable(() -> {
            Map<String, PartitionOwnership> map = (Map) tuple2.getT1();
            List list = (List) tuple2.getT2();
            if (CoreUtils.isNullOrEmpty(list)) {
                throw this.logger.logExceptionAsError(Exceptions.propagate(new IllegalStateException("There are no partitions in Event Hub " + this.eventHubName)));
            }
            int size = list.size();
            this.logger.info("CheckpointStore returned {} ownership records", new Object[]{Integer.valueOf(map.size())});
            this.logger.info("Event Hubs service returned {} partitions", new Object[]{Integer.valueOf(size)});
            if (!isValid(map)) {
                throw this.logger.logExceptionAsError(Exceptions.propagate(new IllegalStateException("Invalid partitionOwnership data from CheckpointStore")));
            }
            Map<String, PartitionOwnership> removeInactivePartitionOwnerships = removeInactivePartitionOwnerships(map);
            this.logger.info("Number of active ownership records {}", new Object[]{Integer.valueOf(removeInactivePartitionOwnerships.size())});
            Map<String, List<PartitionOwnership>> map2 = (Map) removeInactivePartitionOwnerships.values().stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getOwnerId();
            }, Collectors.mapping(Function.identity(), Collectors.toList())));
            map2.putIfAbsent(this.ownerId, new ArrayList());
            this.logger.verbose("Current partition distribution {}", new Object[]{format(map2)});
            if (CoreUtils.isNullOrEmpty(removeInactivePartitionOwnerships)) {
                claimOwnership(map, map2, (String) list.get(RANDOM.nextInt(size)));
                return;
            }
            int size2 = map2.size();
            this.logger.info("Number of active event processors {}", new Object[]{Integer.valueOf(map2.size())});
            int i = size / size2;
            int i2 = size % size2;
            this.logger.info("Expected min partitions per event processor = {}, expected number of event processors with additional partition = {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2)});
            if (isLoadBalanced(i, i2, map2)) {
                this.logger.info("Load is balanced with this event processor owning {} partitions", new Object[]{Integer.valueOf(map2.get(this.ownerId).size())});
                this.checkpointStore.claimOwnership((List) this.partitionPumpManager.getPartitionPumps().keySet().stream().map(str -> {
                    return createPartitionOwnershipRequest(map, str);
                }).collect(Collectors.toList())).subscribe(partitionOwnership -> {
                }, th -> {
                    this.logger.error("Error renewing partition ownership", new Object[]{th});
                    this.isLoadBalancerRunning.set(false);
                }, () -> {
                    this.isLoadBalancerRunning.set(false);
                });
            } else if (shouldOwnMorePartitions(i, map2)) {
                this.logger.info("Load is unbalanced and this event processor owns {} partitions and should own more partitions", new Object[]{Integer.valueOf(map2.get(this.ownerId).size())});
                claimOwnership(map, map2, (String) list.parallelStream().filter(str2 -> {
                    return !removeInactivePartitionOwnerships.containsKey(str2);
                }).findAny().orElseGet(() -> {
                    this.logger.info("No unclaimed partitions, stealing from another event processor");
                    return findPartitionToSteal(map2);
                }));
            } else {
                this.logger.info("This event processor owns {} partitions and shouldn't own more", new Object[]{Integer.valueOf(map2.get(this.ownerId).size())});
                this.checkpointStore.claimOwnership((List) this.partitionPumpManager.getPartitionPumps().keySet().stream().map(str3 -> {
                    return createPartitionOwnershipRequest(map, str3);
                }).collect(Collectors.toList())).subscribe(partitionOwnership2 -> {
                }, th2 -> {
                    this.logger.error("Error renewing partition ownership", new Object[]{th2});
                    this.isLoadBalancerRunning.set(false);
                }, () -> {
                    this.isLoadBalancerRunning.set(false);
                });
            }
        });
    }

    private String format(Map<String, List<PartitionOwnership>> map) {
        return (String) map.entrySet().stream().map(entry -> {
            StringBuilder sb = new StringBuilder();
            sb.append((String) entry.getKey()).append("=[");
            sb.append((String) ((List) entry.getValue()).stream().map(partitionOwnership -> {
                return partitionOwnership.getPartitionId();
            }).collect(Collectors.joining(",")));
            sb.append("]");
            return sb.toString();
        }).collect(Collectors.joining(";"));
    }

    private boolean isValid(Map<String, PartitionOwnership> map) {
        return map.values().stream().noneMatch(partitionOwnership -> {
            return partitionOwnership.getEventHubName() == null || !partitionOwnership.getEventHubName().equals(this.eventHubName) || partitionOwnership.getConsumerGroup() == null || !partitionOwnership.getConsumerGroup().equals(this.consumerGroupName) || partitionOwnership.getPartitionId() == null || partitionOwnership.getLastModifiedTime() == null || partitionOwnership.getETag() == null;
        });
    }

    private String findPartitionToSteal(Map<String, List<PartitionOwnership>> map) {
        Map.Entry<String, List<PartitionOwnership>> entry = map.entrySet().stream().max(Comparator.comparingInt(entry2 -> {
            return ((List) entry2.getValue()).size();
        })).get();
        int size = entry.getValue().size();
        this.logger.info("Owner id {} owns {} partitions, stealing a partition from it", new Object[]{entry.getKey(), Integer.valueOf(size)});
        return entry.getValue().get(RANDOM.nextInt(size)).getPartitionId();
    }

    private boolean isLoadBalanced(int i, int i2, Map<String, List<PartitionOwnership>> map) {
        int i3 = 0;
        Iterator<List<PartitionOwnership>> it = map.values().iterator();
        while (it.hasNext()) {
            int size = it.next().size();
            if (size < i || size > i + 1) {
                return false;
            }
            if (size == i + 1) {
                i3++;
            }
        }
        return i3 == i2;
    }

    private boolean shouldOwnMorePartitions(int i, Map<String, List<PartitionOwnership>> map) {
        int size = map.get(this.ownerId).size();
        return size < i || size == map.values().stream().min(Comparator.comparingInt((v0) -> {
            return v0.size();
        })).get().size();
    }

    private Map<String, PartitionOwnership> removeInactivePartitionOwnerships(Map<String, PartitionOwnership> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return System.currentTimeMillis() - ((PartitionOwnership) entry.getValue()).getLastModifiedTime().longValue() < TimeUnit.SECONDS.toMillis(this.inactiveTimeLimitInSeconds) && !CoreUtils.isNullOrEmpty(((PartitionOwnership) entry.getValue()).getOwnerId());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private void claimOwnership(Map<String, PartitionOwnership> map, Map<String, List<PartitionOwnership>> map2, String str) {
        this.logger.info("Attempting to claim ownership of partition {}", new Object[]{str});
        PartitionOwnership createPartitionOwnershipRequest = createPartitionOwnershipRequest(map, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createPartitionOwnershipRequest);
        arrayList.addAll((Collection) this.partitionPumpManager.getPartitionPumps().keySet().stream().map(str2 -> {
            return createPartitionOwnershipRequest(map, str2);
        }).collect(Collectors.toList()));
        this.checkpointStore.claimOwnership(arrayList).timeout(Duration.ofMinutes(1L)).doOnNext(partitionOwnership -> {
            this.logger.info("Successfully claimed ownership of partition {}", new Object[]{partitionOwnership.getPartitionId()});
        }).doOnError(th -> {
            this.logger.warning(Messages.FAILED_TO_CLAIM_OWNERSHIP, new Object[]{createPartitionOwnershipRequest.getPartitionId(), th.getMessage(), th});
        }).collectList().zipWhen(list -> {
            return this.checkpointStore.listCheckpoints(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).collectMap(checkpoint -> {
                return checkpoint.getPartitionId();
            }, Function.identity());
        }).subscribe(tuple2 -> {
            ((List) tuple2.getT1()).stream().forEach(partitionOwnership2 -> {
                this.partitionPumpManager.startPartitionPump(partitionOwnership2, (Checkpoint) ((Map) tuple2.getT2()).get(partitionOwnership2.getPartitionId()));
            });
        }, th2 -> {
            this.logger.warning("Error while listing checkpoints", new Object[]{th2});
            this.processError.accept(new ErrorContext(this.partitionAgnosticContext, th2));
            this.isLoadBalancerRunning.set(false);
            throw this.logger.logExceptionAsError(new IllegalStateException("Error while listing checkpoints", th2));
        }, () -> {
            this.isLoadBalancerRunning.set(false);
        });
    }

    private PartitionOwnership createPartitionOwnershipRequest(Map<String, PartitionOwnership> map, String str) {
        PartitionOwnership partitionOwnership = map.get(str);
        return new PartitionOwnership().setFullyQualifiedNamespace(this.fullyQualifiedNamespace).setOwnerId(this.ownerId).setPartitionId(str).setConsumerGroup(this.consumerGroupName).setEventHubName(this.eventHubName).setETag(partitionOwnership == null ? null : partitionOwnership.getETag());
    }
}
