package org.redisson;

import io.netty.util.Timeout;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/redisson/RedissonReliableTopic.class */
public class RedissonReliableTopic extends RedissonExpirable implements RReliableTopic {
    private static final Logger log = LoggerFactory.getLogger(RedissonReliableTopic.class);
    private final Map<String, Entry> listeners;
    private final String subscriberId;
    private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
    private volatile Timeout timeoutTask;
    private final RStream<String, Object> stream;
    private final AtomicBoolean subscribed;
    private final String timeoutName;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/redisson/RedissonReliableTopic$Entry.class */
    public static class Entry {
        private final Class<?> type;
        private final MessageListener<?> listener;

        Entry(Class<?> cls, MessageListener<?> messageListener) {
            this.type = cls;
            this.listener = messageListener;
        }

        public Class<?> getType() {
            return this.type;
        }

        public MessageListener<?> getListener() {
            return this.listener;
        }
    }

    public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandAsyncExecutor, String str, String str2) {
        super(codec, commandAsyncExecutor, str);
        this.listeners = new ConcurrentHashMap();
        this.subscribed = new AtomicBoolean();
        this.stream = new RedissonStream(new CompositeCodec(StringCodec.INSTANCE, codec), commandAsyncExecutor, str);
        this.subscriberId = str2 == null ? getServiceManager().generateId() : str2;
        this.timeoutName = getTimeout(getRawName());
    }

    public RedissonReliableTopic(CommandAsyncExecutor commandAsyncExecutor, String str, String str2) {
        this(commandAsyncExecutor.getServiceManager().getCfg().getCodec(), commandAsyncExecutor, str, str2);
    }

    private String getTimeout(String str) {
        return suffixName(str, "timeout");
    }

    @Override // org.redisson.api.RReliableTopic
    public long publish(Object obj) {
        return ((Long) get(publishAsync(obj))).longValue();
    }

    @Override // org.redisson.api.RReliableTopic
    public <M> String addListener(Class<M> cls, MessageListener<M> messageListener) {
        return (String) get(addListenerAsync(cls, messageListener));
    }

    @Override // org.redisson.api.RReliableTopic
    public void removeListener(String... strArr) {
        get(removeListenerAsync(strArr));
    }

    @Override // org.redisson.api.RReliableTopic
    public void removeAllListeners() {
        get(removeAllListenersAsync());
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Void> removeAllListenersAsync() {
        this.listeners.clear();
        return removeSubscriber();
    }

    @Override // org.redisson.api.RReliableTopic
    public long size() {
        return ((Long) get(sizeAsync())).longValue();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Long> sizeAsync() {
        return this.stream.sizeAsync();
    }

    @Override // org.redisson.api.RReliableTopic
    public int countListeners() {
        return this.listeners.size();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Long> publishAsync(Object obj) {
        return this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_LONG, "redis.call('xadd', KEYS[1], '*', 'm', ARGV[1]); local v = redis.call('xinfo', 'groups', KEYS[1]); return #v;", Arrays.asList(getRawName()), encode(obj));
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public <M> RFuture<String> addListenerAsync(Class<M> cls, MessageListener<M> messageListener) {
        String generateId = getServiceManager().generateId();
        this.listeners.put(generateId, new Entry(cls, messageListener));
        return !this.subscribed.compareAndSet(false, true) ? new CompletableFutureWrapper(generateId) : new CompletableFutureWrapper((CompletionStage) this.commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ", Arrays.asList(getRawName(), this.timeoutName), StreamMessageId.ALL, this.subscriberId, Long.valueOf(System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout())).thenApply(r5 -> {
            renewExpiration();
            poll(this.subscriberId);
            return generateId;
        }));
    }

    private void poll(String str) {
        this.stream.pendingRangeAsync(str, StreamMessageId.MIN, StreamMessageId.MAX, 100).thenCompose(map -> {
            if (this.subscribed.get() && map.isEmpty()) {
                this.readFuture = this.stream.readGroupAsync(str, "consumer", StreamReadGroupArgs.neverDelivered().timeout(Duration.ofSeconds(0L)));
                return this.readFuture;
            }
            return CompletableFuture.completedFuture(map);
        }).whenComplete((map2, th) -> {
            if (th == null) {
                CompletableFuture completableFuture = new CompletableFuture();
                if (this.listeners.isEmpty()) {
                    completableFuture.complete(null);
                } else {
                    getServiceManager().getExecutor().execute(() -> {
                        for (Map.Entry entry : map2.entrySet()) {
                            Object obj = ((Map) entry.getValue()).get("m");
                            this.listeners.values().forEach(entry2 -> {
                                if (entry2.getType().isInstance(obj)) {
                                    entry2.getListener().onMessage(getRawName(), obj);
                                    this.stream.ack(str, (StreamMessageId) entry.getKey());
                                }
                            });
                        }
                        completableFuture.complete(null);
                    });
                }
                completableFuture.thenAccept(r14 -> {
                    this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local expired = redis.call('zrangebyscore', KEYS[2], 0, tonumber(ARGV[2]) - 1); for i, v in ipairs(expired) do redis.call('xgroup', 'destroy', KEYS[1], v); end; local r = redis.call('zscore', KEYS[2], ARGV[1]); local score = 92233720368547758;local groups = redis.call('xinfo', 'groups', KEYS[1]); for i, v in ipairs(groups) do local id1, id2 = string.match(v[8], '(.*)%-(.*)'); score = math.min(tonumber(id1), score); end; score = tostring(score) .. '-0';local range = redis.call('xrange', KEYS[1], score, '+'); if #range == 0 or (#range == 1 and range[1][1] == score) then redis.call('xtrim', KEYS[1], 'maxlen', 0); else redis.call('xtrim', KEYS[1], 'maxlen', #range); end;return r ~= false; ", Arrays.asList(getRawName(), this.timeoutName), str, Long.valueOf(System.currentTimeMillis())).whenComplete((bool, th) -> {
                        if (th != null) {
                            if (getServiceManager().isShuttingDown(th)) {
                                return;
                            }
                            log.error("Unable to update subscriber status", th);
                        } else if (bool.booleanValue() && this.subscribed.get()) {
                            poll(str);
                        }
                    });
                });
                return;
            }
            if (getServiceManager().isShuttingDown(th)) {
                return;
            }
            if (th.getCause() == null || !th.getCause().getMessage().contains("NOGROUP")) {
                log.error("Unable to poll a new element. Subscriber id: {}", str, th.getCause());
                getServiceManager().newTimeout(timeout -> {
                    if (getServiceManager().isShuttingDown()) {
                        return;
                    }
                    poll(str);
                }, 1L, TimeUnit.SECONDS);
            }
        });
    }

    @Override // org.redisson.RedissonObject, org.redisson.api.RObjectAsync
    public RFuture<Boolean> deleteAsync() {
        return deleteAsync(getRawName(), this.timeoutName);
    }

    @Override // org.redisson.RedissonObject, org.redisson.api.RObjectAsync
    public RFuture<Long> sizeInMemoryAsync() {
        return super.sizeInMemoryAsync(Arrays.asList(getRawName(), this.timeoutName));
    }

    @Override // org.redisson.RedissonObject
    public RFuture<Boolean> copyAsync(List<Object> list, int i, boolean z) {
        String str = (String) list.get(1);
        return super.copyAsync(Arrays.asList(getRawName(), this.timeoutName, str, getTimeout(str)), i, z);
    }

    @Override // org.redisson.RedissonExpirable
    public RFuture<Boolean> expireAsync(long j, TimeUnit timeUnit, String str, String... strArr) {
        return super.expireAsync(j, timeUnit, str, getRawName(), this.timeoutName);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.redisson.RedissonExpirable
    public RFuture<Boolean> expireAtAsync(long j, String str, String... strArr) {
        return super.expireAtAsync(j, str, getRawName(), this.timeoutName);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public RFuture<Boolean> clearExpireAsync() {
        return clearExpireAsync(getRawName(), this.timeoutName);
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Void> removeListenerAsync(String... strArr) {
        this.listeners.keySet().removeAll(Arrays.asList(strArr));
        return this.listeners.isEmpty() ? removeSubscriber() : new CompletableFutureWrapper((Void) null);
    }

    private RFuture<Void> removeSubscriber() {
        this.subscribed.set(false);
        this.readFuture.cancel(false);
        this.timeoutTask.cancel();
        return this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('xgroup', 'destroy', KEYS[1], ARGV[1]); redis.call('zrem', KEYS[2], ARGV[1]); ", Arrays.asList(getRawName(), this.timeoutName), this.subscriberId);
    }

    @Override // org.redisson.api.RReliableTopic
    public int countSubscribers() {
        return ((Integer) get(countSubscribersAsync())).intValue();
    }

    @Override // org.redisson.api.RReliableTopicAsync
    public RFuture<Integer> countSubscribersAsync() {
        return this.commandExecutor.evalReadAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local v = redis.call('xinfo', 'groups', KEYS[1]); return #v;", Arrays.asList(getRawName()), new Object[0]);
    }

    private void renewExpiration() {
        this.timeoutTask = getServiceManager().newTimeout(timeout -> {
            if (!this.subscribed.get() || getServiceManager().isShuttingDown()) {
                return;
            }
            this.commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('zscore', KEYS[1], ARGV[2]) == false then return 0; end; redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); return 1; ", Arrays.asList(this.timeoutName), Long.valueOf(System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout()), this.subscriberId).whenComplete((bool, th) -> {
                if (th != null) {
                    log.error("Can't update reliable topic {} expiration time", getRawName(), th);
                } else if (bool.booleanValue()) {
                    renewExpiration();
                }
            });
        }, getServiceManager().getCfg().getReliableTopicWatchdogTimeout() / 3, TimeUnit.MILLISECONDS);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture getExpireTimeAsync() {
        return super.getExpireTimeAsync();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ long getExpireTime() {
        return super.getExpireTime();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture remainTimeToLiveAsync() {
        return super.remainTimeToLiveAsync();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ long remainTimeToLive() {
        return super.remainTimeToLive();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean clearExpire() {
        return super.clearExpire();
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfLessAsync(Duration duration) {
        return super.expireIfLessAsync(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfLess(Duration duration) {
        return super.expireIfLess(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfGreaterAsync(Duration duration) {
        return super.expireIfGreaterAsync(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfGreater(Duration duration) {
        return super.expireIfGreater(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfNotSetAsync(Duration duration) {
        return super.expireIfNotSetAsync(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfNotSet(Duration duration) {
        return super.expireIfNotSet(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfSetAsync(Duration duration) {
        return super.expireIfSetAsync(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfSet(Duration duration) {
        return super.expireIfSet(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAtAsync(Date date) {
        return super.expireAtAsync(date);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireAt(Date date) {
        return super.expireAt(date);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAsync(Duration duration) {
        return super.expireAsync(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expire(Duration duration) {
        return super.expire(duration);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAsync(Instant instant) {
        return super.expireAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfLessAsync(Instant instant) {
        return super.expireIfLessAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfLess(Instant instant) {
        return super.expireIfLess(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfGreaterAsync(Instant instant) {
        return super.expireIfGreaterAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfGreater(Instant instant) {
        return super.expireIfGreater(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfNotSetAsync(Instant instant) {
        return super.expireIfNotSetAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfNotSet(Instant instant) {
        return super.expireIfNotSet(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireIfSetAsync(Instant instant) {
        return super.expireIfSetAsync(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireIfSet(Instant instant) {
        return super.expireIfSet(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expire(Instant instant) {
        return super.expire(instant);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAtAsync(long j) {
        return super.expireAtAsync(j);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expireAt(long j) {
        return super.expireAt(j);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirableAsync
    public /* bridge */ /* synthetic */ RFuture expireAsync(long j, TimeUnit timeUnit) {
        return super.expireAsync(j, timeUnit);
    }

    @Override // org.redisson.RedissonExpirable, org.redisson.api.RExpirable
    public /* bridge */ /* synthetic */ boolean expire(long j, TimeUnit timeUnit) {
        return super.expire(j, timeUnit);
    }
}
