package com.aol.cyclops.data.async;

import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.react.async.subscription.Continueable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.lambda.Seq;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import org.pcollections.PVector;
import org.pcollections.TreePVector;

/* loaded from: input_file:com/aol/cyclops/data/async/Topic.class */
public class Topic<T> implements Adapter<T> {
    private final DistributingCollection<T> distributor = new DistributingCollection<>();
    private volatile PMap<Seq, Queue<T>> streamToQueue = HashTreePMap.empty();
    private final Object lock = new Object();
    private volatile int index = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/aol/cyclops/data/async/Topic$DistributingCollection.class */
    public static class DistributingCollection<T> extends ArrayList<T> {
        private static final long serialVersionUID = 1;
        private volatile PVector<Queue<T>> subscribers = TreePVector.empty();
        private final Object lock = new Object();

        DistributingCollection() {
        }

        public void addQueue(Queue<T> queue) {
            synchronized (this.lock) {
                this.subscribers = this.subscribers.plus(queue);
            }
        }

        public void removeQueue(Queue<T> queue) {
            synchronized (this.lock) {
                this.subscribers = this.subscribers.minus(queue);
            }
        }

        @Override // java.util.ArrayList, java.util.AbstractList, java.util.AbstractCollection, java.util.Collection, java.util.List
        public boolean add(T t) {
            this.subscribers.forEach(queue -> {
                queue.offer(t);
            });
            return true;
        }

        @Override // java.util.ArrayList, java.util.AbstractCollection, java.util.Collection, java.util.List
        public boolean addAll(Collection<? extends T> collection) {
            this.subscribers.forEach(queue -> {
                collection.forEach(obj -> {
                    queue.offer(obj);
                });
            });
            return true;
        }

        public PVector<Queue<T>> getSubscribers() {
            return this.subscribers;
        }
    }

    public Topic() {
        this.distributor.addQueue(new Queue<>());
    }

    public Topic(Queue<T> queue) {
        this.distributor.addQueue(queue);
    }

    public void disconnect(Stream<T> stream) {
        synchronized (this.lock) {
            this.distributor.removeQueue((Queue) this.streamToQueue.get(stream));
            this.streamToQueue = this.streamToQueue.minus(stream);
            this.index--;
        }
    }

    private <R> ReactiveSeq<R> connect(Function<Queue<T>, ReactiveSeq<R>> function) {
        ReactiveSeq<R> apply;
        synchronized (this.lock) {
            Queue<T> nextQueue = getNextQueue();
            apply = function.apply(nextQueue);
            this.streamToQueue = this.streamToQueue.plus(apply, nextQueue);
        }
        return apply;
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public boolean fromStream(Stream<T> stream) {
        stream.collect(Collectors.toCollection(() -> {
            return this.distributor;
        }));
        return true;
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
        return (ReactiveSeq<CompletableFuture<T>>) connect(queue -> {
            return queue.streamCompletableFutures();
        });
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public ReactiveSeq<T> stream() {
        return (ReactiveSeq<T>) connect(queue -> {
            return queue.stream();
        });
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public ReactiveSeq<T> stream(Continueable continueable) {
        return (ReactiveSeq<T>) connect(queue -> {
            return queue.stream(continueable);
        });
    }

    private Queue<T> getNextQueue() {
        if (this.index >= this.distributor.getSubscribers().size()) {
            this.distributor.addQueue(new Queue<>());
        }
        PVector<Queue<T>> subscribers = this.distributor.getSubscribers();
        int i = this.index;
        this.index = i + 1;
        return (Queue) subscribers.get(i);
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public boolean close() {
        this.distributor.getSubscribers().forEach(queue -> {
            queue.close();
        });
        return true;
    }

    public Signal<Integer> getSizeSignal(int i) {
        return ((Queue) this.distributor.getSubscribers().get(i)).getSizeSignal();
    }

    public void setSizeSignal(int i, Signal<Integer> signal) {
        ((Queue) this.distributor.getSubscribers().get(i)).setSizeSignal(signal);
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public boolean offer(T t) {
        fromStream(Stream.of(t));
        return true;
    }

    @Override // com.aol.cyclops.data.async.Adapter
    public <R> R visit(Function<? super Queue<T>, ? extends R> function, Function<? super Topic<T>, ? extends R> function2) {
        return function2.apply(this);
    }

    DistributingCollection<T> getDistributor() {
        return this.distributor;
    }

    PMap<Seq, Queue<T>> getStreamToQueue() {
        return this.streamToQueue;
    }
}
