/*
 * Decompiled with CFR 0.152.
 */
package kafka.log.remote;

import com.yammer.metrics.core.Gauge;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.log.remote.CustomMetadataSizeLimitExceededException;
import kafka.log.remote.RemoteLogReader;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.RemoteLogInputStream;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.ChildFirstClassLoader;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.apache.kafka.storage.internals.log.RemoteIndexCache;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class RemoteLogManager
implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
    private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
    private final RemoteLogManagerConfig rlmConfig;
    private final int brokerId;
    private final String logDir;
    private final Time time;
    private final Function<TopicPartition, Optional<UnifiedLog>> fetchLog;
    private final BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset;
    private final BrokerTopicStats brokerTopicStats;
    private final RemoteStorageManager remoteLogStorageManager;
    private final RemoteLogMetadataManager remoteLogMetadataManager;
    private final RemoteIndexCache indexCache;
    private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
    private final RLMScheduledThreadPool rlmScheduledThreadPool;
    private final long delayInMs;
    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> leaderOrFollowerTasks = new ConcurrentHashMap();
    private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap<TopicPartition, Uuid>();
    private final String clusterId;
    private Optional<EndPoint> endpoint = Optional.empty();
    private boolean closed = false;
    private KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());

    public RemoteLogManager(RemoteLogManagerConfig rlmConfig, int brokerId, String logDir, String clusterId, Time time, Function<TopicPartition, Optional<UnifiedLog>> fetchLog, BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats) throws IOException {
        this.rlmConfig = rlmConfig;
        this.brokerId = brokerId;
        this.logDir = logDir;
        this.clusterId = clusterId;
        this.time = time;
        this.fetchLog = fetchLog;
        this.updateRemoteLogStartOffset = updateRemoteLogStartOffset;
        this.brokerTopicStats = brokerTopicStats;
        this.remoteLogStorageManager = this.createRemoteStorageManager();
        this.remoteLogMetadataManager = this.createRemoteLogMetadataManager();
        this.indexCache = new RemoteIndexCache(1024, this.remoteLogStorageManager, logDir);
        this.delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
        this.rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
        this.metricsGroup.newGauge(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(), (Gauge)new Gauge<Double>(){

            public Double value() {
                return RemoteLogManager.this.rlmScheduledThreadPool.getIdlePercent();
            }
        });
        this.remoteStorageReaderThreadPool = new RemoteStorageThreadPool(REMOTE_LOG_READER_THREAD_NAME_PREFIX, rlmConfig.remoteLogReaderThreads(), rlmConfig.remoteLogReaderMaxPendingTasks());
    }

    private void removeMetrics() {
        this.metricsGroup.removeMetric(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
        this.remoteStorageReaderThreadPool.removeMetrics();
    }

    private <T> T createDelegate(ClassLoader classLoader, String className) {
        try {
            return (T)classLoader.loadClass(className).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new KafkaException((Throwable)e);
        }
    }

    RemoteStorageManager createRemoteStorageManager() {
        return AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>(){
            private final String classPath;
            {
                this.classPath = RemoteLogManager.this.rlmConfig.remoteStorageManagerClassPath();
            }

            @Override
            public RemoteStorageManager run() {
                if (this.classPath != null && !this.classPath.trim().isEmpty()) {
                    ChildFirstClassLoader classLoader = new ChildFirstClassLoader(this.classPath, this.getClass().getClassLoader());
                    RemoteStorageManager delegate = (RemoteStorageManager)RemoteLogManager.this.createDelegate((ClassLoader)classLoader, RemoteLogManager.this.rlmConfig.remoteStorageManagerClassName());
                    return new ClassLoaderAwareRemoteStorageManager(delegate, (ClassLoader)classLoader);
                }
                return (RemoteStorageManager)RemoteLogManager.this.createDelegate(this.getClass().getClassLoader(), RemoteLogManager.this.rlmConfig.remoteStorageManagerClassName());
            }
        });
    }

    private void configureRSM() {
        HashMap<String, Integer> rsmProps = new HashMap<String, Integer>(this.rlmConfig.remoteStorageManagerProps());
        rsmProps.put(KafkaConfig.BrokerIdProp(), this.brokerId);
        this.remoteLogStorageManager.configure(rsmProps);
    }

    RemoteLogMetadataManager createRemoteLogMetadataManager() {
        return AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>(){
            private final String classPath;
            {
                this.classPath = RemoteLogManager.this.rlmConfig.remoteLogMetadataManagerClassPath();
            }

            @Override
            public RemoteLogMetadataManager run() {
                if (this.classPath != null && !this.classPath.trim().isEmpty()) {
                    ChildFirstClassLoader classLoader = new ChildFirstClassLoader(this.classPath, this.getClass().getClassLoader());
                    RemoteLogMetadataManager delegate = (RemoteLogMetadataManager)RemoteLogManager.this.createDelegate((ClassLoader)classLoader, RemoteLogManager.this.rlmConfig.remoteLogMetadataManagerClassName());
                    return new ClassLoaderAwareRemoteLogMetadataManager(delegate, (ClassLoader)classLoader);
                }
                return (RemoteLogMetadataManager)RemoteLogManager.this.createDelegate(this.getClass().getClassLoader(), RemoteLogManager.this.rlmConfig.remoteLogMetadataManagerClassName());
            }
        });
    }

    public void onEndPointCreated(EndPoint endpoint) {
        this.endpoint = Optional.of(endpoint);
    }

    private void configureRLMM() {
        HashMap<String, Object> rlmmProps = new HashMap<String, Object>();
        this.endpoint.ifPresent(e -> {
            rlmmProps.put("remote.log.metadata.common.client.bootstrap.servers", e.host() + ":" + e.port());
            rlmmProps.put("remote.log.metadata.common.client.security.protocol", e.securityProtocol().name);
        });
        rlmmProps.putAll(this.rlmConfig.remoteLogMetadataManagerProps());
        rlmmProps.put(KafkaConfig.BrokerIdProp(), this.brokerId);
        rlmmProps.put(KafkaConfig.LogDirProp(), this.logDir);
        rlmmProps.put("cluster.id", this.clusterId);
        this.remoteLogMetadataManager.configure(rlmmProps);
    }

    public void startup() {
        this.configureRSM();
        this.configureRLMM();
    }

    public RemoteStorageManager storageManager() {
        return this.remoteLogStorageManager;
    }

    private Stream<Partition> filterPartitions(Set<Partition> partitions) {
        return partitions.stream().filter(partition -> partition.log().exists(UnifiedLog::remoteLogEnabled));
    }

    private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
        Uuid previousTopicId = this.topicIdByPartitionMap.put(topicIdPartition.topicPartition(), topicIdPartition.topicId());
        if (previousTopicId != null && !previousTopicId.equals((Object)topicIdPartition.topicId())) {
            LOGGER.info("Previous cached topic id {} for {} does not match updated topic id {}", new Object[]{previousTopicId, topicIdPartition.topicPartition(), topicIdPartition.topicId()});
        }
    }

    public void onLeadershipChange(Set<Partition> partitionsBecomeLeader, Set<Partition> partitionsBecomeFollower, Map<String, Uuid> topicIds) {
        LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
        Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch = this.filterPartitions(partitionsBecomeLeader).collect(Collectors.toMap(partition -> new TopicIdPartition((Uuid)topicIds.get(partition.topic()), partition.topicPartition()), Partition::getLeaderEpoch));
        Set<TopicIdPartition> leaderPartitions = leaderPartitionsWithLeaderEpoch.keySet();
        Set<TopicIdPartition> followerPartitions = this.filterPartitions(partitionsBecomeFollower).map(p -> new TopicIdPartition((Uuid)topicIds.get(p.topic()), p.topicPartition())).collect(Collectors.toSet());
        if (!leaderPartitions.isEmpty() || !followerPartitions.isEmpty()) {
            LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", leaderPartitions, followerPartitions);
            leaderPartitions.forEach(this::cacheTopicPartitionIds);
            followerPartitions.forEach(this::cacheTopicPartitionIds);
            this.remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
            followerPartitions.forEach(topicIdPartition -> this.doHandleLeaderOrFollowerPartitions((TopicIdPartition)topicIdPartition, RLMTask::convertToFollower));
            leaderPartitionsWithLeaderEpoch.forEach((topicIdPartition, leaderEpoch) -> this.doHandleLeaderOrFollowerPartitions((TopicIdPartition)topicIdPartition, rlmTask -> rlmTask.convertToLeader((int)leaderEpoch)));
        }
    }

    public void stopPartitions(Set<StopPartition> stopPartitions, BiConsumer<TopicPartition, Throwable> errorHandler) {
        LOGGER.debug("Stop partitions: {}", stopPartitions);
        for (StopPartition stopPartition : stopPartitions) {
            TopicPartition tp = stopPartition.topicPartition();
            try {
                if (!this.topicIdByPartitionMap.containsKey(tp)) continue;
                TopicIdPartition tpId2 = new TopicIdPartition((Uuid)this.topicIdByPartitionMap.get(tp), tp);
                RLMTaskWithFuture task = this.leaderOrFollowerTasks.remove(tpId2);
                if (task != null) {
                    LOGGER.info("Cancelling the RLM task for tpId: {}", (Object)tpId2);
                    task.cancel();
                }
                if (!stopPartition.deleteRemoteLog()) continue;
                LOGGER.info("Deleting the remote log segments task for partition: {}", (Object)tpId2);
                this.deleteRemoteLogPartition(tpId2);
            }
            catch (Exception ex) {
                errorHandler.accept(tp, ex);
                LOGGER.error("Error while stopping the partition: {}", (Object)stopPartition, (Object)ex);
            }
        }
        Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream().filter(sp2 -> sp2.deleteLocalLog() && this.topicIdByPartitionMap.containsKey(sp2.topicPartition())).map(sp2 -> new TopicIdPartition((Uuid)this.topicIdByPartitionMap.get(sp2.topicPartition()), sp2.topicPartition())).collect(Collectors.toSet());
        if (!deleteLocalPartitions.isEmpty()) {
            this.remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
            deleteLocalPartitions.forEach(tpId -> {
                Uuid cfr_ignored_0 = (Uuid)this.topicIdByPartitionMap.remove(tpId.topicPartition());
            });
        }
    }

    private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteStorageException, ExecutionException, InterruptedException {
        ArrayList metadataList = new ArrayList();
        this.remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
        List<RemoteLogSegmentMetadataUpdate> deleteSegmentStartedEvents = metadataList.stream().map(metadata -> new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), this.time.milliseconds(), metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, this.brokerId)).collect(Collectors.toList());
        this.publishEvents(deleteSegmentStartedEvents).get();
        ArrayList<Uuid> deletedSegmentIds = new ArrayList<Uuid>();
        for (RemoteLogSegmentMetadata metadata2 : metadataList) {
            deletedSegmentIds.add(metadata2.remoteLogSegmentId().id());
            this.remoteLogStorageManager.deleteLogSegmentData(metadata2);
        }
        this.indexCache.removeAll(deletedSegmentIds);
        List<RemoteLogSegmentMetadataUpdate> deleteSegmentFinishedEvents = metadataList.stream().map(metadata -> new RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), this.time.milliseconds(), metadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, this.brokerId)).collect(Collectors.toList());
        this.publishEvents(deleteSegmentFinishedEvents).get();
    }

    private CompletableFuture<Void> publishEvents(List<RemoteLogSegmentMetadataUpdate> events) throws RemoteStorageException {
        ArrayList<CompletableFuture> result = new ArrayList<CompletableFuture>();
        for (RemoteLogSegmentMetadataUpdate event : events) {
            result.add(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(event));
        }
        return CompletableFuture.allOf(result.toArray(new CompletableFuture[0]));
    }

    public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, int epochForOffset, long offset) throws RemoteStorageException {
        Uuid topicId = (Uuid)this.topicIdByPartitionMap.get(topicPartition);
        if (topicId == null) {
            throw new KafkaException("No topic id registered for topic partition: " + topicPartition);
        }
        return this.remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Optional<FileRecords.TimestampAndOffset> lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) throws RemoteStorageException, IOException {
        block5: {
            startPos = this.indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset);
            remoteSegInputStream = null;
            try {
                remoteSegInputStream = this.remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos);
                remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
                while ((batch = remoteLogInputStream.nextBatch()) != null) {
                    if (batch.maxTimestamp() < timestamp || batch.lastOffset() < startingOffset) continue;
                    for (Record record : batch) {
                        if (record.timestamp() < timestamp || record.offset() < startingOffset) continue;
                        var12_10 = Optional.of(new FileRecords.TimestampAndOffset(record.timestamp(), record.offset(), this.maybeLeaderEpoch(batch.partitionLeaderEpoch())));
                        break block5;
                    }
                }
                ** GOTO lbl-1000
            }
            catch (Throwable var13_11) {
                Utils.closeQuietly(remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
                throw var13_11;
            }
        }
        Utils.closeQuietly((AutoCloseable)remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
        return var12_10;
lbl-1000:
        // 1 sources

        {
            var9_7 = Optional.empty();
        }
        Utils.closeQuietly((AutoCloseable)remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
        return var9_7;
    }

    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
        return leaderEpoch == -1 ? Optional.empty() : Optional.of(leaderEpoch);
    }

    public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicPartition tp, long timestamp, long startingOffset, LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException, IOException {
        Uuid topicId = (Uuid)this.topicIdByPartitionMap.get(tp);
        if (topicId == null) {
            throw new KafkaException("Topic id does not exist for topic partition: " + tp);
        }
        Optional<UnifiedLog> unifiedLogOptional = this.fetchLog.apply(tp);
        if (!unifiedLogOptional.isPresent()) {
            throw new KafkaException("UnifiedLog does not exist for topic partition: " + tp);
        }
        UnifiedLog unifiedLog = unifiedLogOptional.get();
        OptionalInt maybeEpoch = leaderEpochCache.epochForOffset(startingOffset);
        TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, tp);
        NavigableMap<Integer, Long> epochWithOffsets = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
        while (maybeEpoch.isPresent()) {
            int epoch = maybeEpoch.getAsInt();
            java.util.Iterator iterator = this.remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
            while (iterator.hasNext()) {
                RemoteLogSegmentMetadata rlsMetadata = (RemoteLogSegmentMetadata)iterator.next();
                if (rlsMetadata.maxTimestampMs() < timestamp || rlsMetadata.endOffset() < startingOffset || !RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(rlsMetadata, unifiedLog.logEndOffset(), epochWithOffsets)) continue;
                return this.lookupTimestamp(rlsMetadata, timestamp, startingOffset);
            }
            maybeEpoch = leaderEpochCache.nextEpoch(epoch);
        }
        return Optional.empty();
    }

    InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long startOffset, long endOffset) {
        InMemoryLeaderEpochCheckpoint checkpoint = new InMemoryLeaderEpochCheckpoint();
        if (log.leaderEpochCache().isDefined()) {
            LeaderEpochFileCache cache = ((LeaderEpochFileCache)log.leaderEpochCache().get()).writeTo((LeaderEpochCheckpoint)checkpoint);
            if (startOffset >= 0L) {
                cache.truncateFromStart(startOffset);
            }
            cache.truncateFromEnd(endOffset);
        }
        return checkpoint;
    }

    public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata, long logEndOffset, NavigableMap<Integer, Long> leaderEpochs) {
        long segmentEndOffset = segmentMetadata.endOffset();
        NavigableMap<Integer, Long> segmentLeaderEpochs = RemoteLogManager.buildFilteredLeaderEpochMap(segmentMetadata.segmentLeaderEpochs());
        Integer segmentFirstEpoch = (Integer)segmentLeaderEpochs.firstKey();
        Integer segmentLastEpoch = (Integer)segmentLeaderEpochs.lastKey();
        if (segmentFirstEpoch < (Integer)leaderEpochs.firstKey() || segmentLastEpoch > (Integer)leaderEpochs.lastKey()) {
            LOGGER.debug("Segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs});
            return false;
        }
        for (Map.Entry entry : segmentLeaderEpochs.entrySet()) {
            Map.Entry<Integer, Long> nextEntry;
            int epoch = (Integer)entry.getKey();
            long offset = (Long)entry.getValue();
            if (!leaderEpochs.containsKey(epoch)) {
                LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs});
                return false;
            }
            if (epoch == segmentFirstEpoch && offset < (Long)leaderEpochs.get(epoch)) {
                LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.", new Object[]{segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch)});
                return false;
            }
            if (epoch == segmentLastEpoch && (nextEntry = leaderEpochs.higherEntry(epoch)) != null && segmentEndOffset > nextEntry.getValue() - 1L) {
                LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.", new Object[]{segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1L});
                return false;
            }
            if (epoch == segmentLastEpoch || leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch))) continue;
            LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs});
            return false;
        }
        return segmentEndOffset < logEndOffset;
    }

    public static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> leaderEpochs) {
        ArrayList duplicatedEpochs = new ArrayList();
        Map.Entry previousEntry = null;
        for (Map.Entry entry : leaderEpochs.entrySet()) {
            if (previousEntry != null && ((Long)previousEntry.getValue()).equals(entry.getValue())) {
                duplicatedEpochs.add(previousEntry.getKey());
            }
            previousEntry = entry;
        }
        if (duplicatedEpochs.isEmpty()) {
            return leaderEpochs;
        }
        TreeMap<Integer, Long> filteredLeaderEpochs = new TreeMap<Integer, Long>((SortedMap<Integer, Long>)leaderEpochs);
        for (Integer duplicatedEpoch : duplicatedEpochs) {
            filteredLeaderEpochs.remove(duplicatedEpoch);
        }
        return filteredLeaderEpochs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
        int firstBatchSize;
        RecordBatch firstBatch;
        InputStream remoteSegInputStream;
        int startPos;
        RemoteLogSegmentMetadata remoteLogSegmentMetadata;
        Optional<UnifiedLog> logOptional;
        int maxBytes;
        long offset;
        boolean includeAbortedTxns;
        block9: {
            block8: {
                FetchDataInfo fetchDataInfo;
                Optional rlsMetadataOptional;
                Option<LeaderEpochFileCache> leaderEpochCache;
                int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
                TopicPartition tp = remoteStorageFetchInfo.topicPartition;
                FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
                includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
                offset = fetchInfo.fetchOffset;
                maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
                logOptional = this.fetchLog.apply(tp);
                OptionalInt epoch = OptionalInt.empty();
                if (logOptional.isPresent() && (leaderEpochCache = logOptional.get().leaderEpochCache()).isDefined()) {
                    epoch = ((LeaderEpochFileCache)leaderEpochCache.get()).epochForOffset(offset);
                }
                Optional<Object> optional = rlsMetadataOptional = epoch.isPresent() ? this.fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) : Optional.empty();
                if (!rlsMetadataOptional.isPresent()) {
                    String epochStr = epoch.isPresent() ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
                    throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + epochStr + " and partition " + tp + " which does not exist in remote tier.");
                }
                remoteLogSegmentMetadata = (RemoteLogSegmentMetadata)rlsMetadataOptional.get();
                startPos = this.lookupPositionForOffset(remoteLogSegmentMetadata, offset);
                remoteSegInputStream = null;
                try {
                    remoteSegInputStream = this.remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
                    RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
                    firstBatch = this.findFirstBatch(remoteLogInputStream, offset);
                    if (firstBatch != null) break block8;
                    fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), (Records)MemoryRecords.EMPTY, false, includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
                }
                catch (Throwable throwable) {
                    Utils.closeQuietly(remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
                    throw throwable;
                }
                Utils.closeQuietly((AutoCloseable)remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
                return fetchDataInfo;
            }
            firstBatchSize = firstBatch.sizeInBytes();
            if (remoteStorageFetchInfo.minOneMessage || remoteStorageFetchInfo.hardMaxBytesLimit || firstBatchSize <= maxBytes) break block9;
            FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), (Records)MemoryRecords.EMPTY);
            Utils.closeQuietly((AutoCloseable)remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
            return fetchDataInfo;
        }
        int updatedFetchSize = remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes;
        ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
        int remainingBytes = updatedFetchSize;
        firstBatch.writeTo(buffer);
        if ((remainingBytes -= firstBatchSize) > 0) {
            Utils.readFully((InputStream)remoteSegInputStream, (ByteBuffer)buffer);
        }
        buffer.flip();
        FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), (Records)MemoryRecords.readableRecords((ByteBuffer)buffer));
        if (includeAbortedTxns) {
            fetchDataInfo = this.addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get());
        }
        FetchDataInfo fetchDataInfo2 = fetchDataInfo;
        Utils.closeQuietly((AutoCloseable)remoteSegInputStream, (String)"RemoteLogSegmentInputStream");
        return fetchDataInfo2;
    }

    private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
        return this.indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
    }

    private FetchDataInfo addAbortedTransactions(long startOffset, RemoteLogSegmentMetadata segmentMetadata, FetchDataInfo fetchInfo, UnifiedLog log) throws RemoteStorageException {
        int fetchSize = fetchInfo.records.sizeInBytes();
        OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
        OffsetIndex offsetIndex = this.indexCache.getIndexEntry(segmentMetadata).offsetIndex();
        long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(position -> position.offset).orElse(segmentMetadata.endOffset() + 1L);
        HashSet abortedTransactions = new HashSet();
        Consumer<List<AbortedTxn>> accumulator = abortedTxns -> abortedTransactions.addAll(abortedTxns.stream().map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
        this.collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log);
        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata, fetchInfo.records, fetchInfo.firstEntryIncomplete, Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList(abortedTransactions)));
    }

    private void collectAbortedTransactions(long startOffset, long upperBoundOffset, RemoteLogSegmentMetadata segmentMetadata, Consumer<List<AbortedTxn>> accumulator, UnifiedLog log) throws RemoteStorageException {
        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
        while (nextSegmentMetadataOpt.isPresent()) {
            Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> this.indexCache.getIndexEntry(metadata).txnIndex());
            if (txnIndexOpt.isPresent()) {
                TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
                accumulator.accept(searchResult.abortedTransactions);
                if (searchResult.isComplete) {
                    return;
                }
            }
            nextSegmentMetadataOpt = this.findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
        }
        this.collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, JavaConverters.asJavaIterator((Iterator)log.logSegments().iterator()));
    }

    private void collectAbortedTransactionInLocalSegments(long startOffset, long upperBoundOffset, Consumer<List<AbortedTxn>> accumulator, java.util.Iterator<LogSegment> localLogSegments) {
        while (localLogSegments.hasNext()) {
            TransactionIndex txnIndex = localLogSegments.next().txnIndex();
            if (txnIndex == null) continue;
            TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
            accumulator.accept(searchResult.abortedTransactions);
            if (!searchResult.isComplete) continue;
            return;
        }
    }

    private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException {
        if (leaderEpochFileCacheOption.isEmpty()) {
            return Optional.empty();
        }
        long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1L;
        OptionalInt epoch = ((LeaderEpochFileCache)leaderEpochFileCacheOption.get()).epochForOffset(nextSegmentBaseOffset);
        return epoch.isPresent() ? this.fetchRemoteLogSegmentMetadata(segmentMetadata.topicIdPartition().topicPartition(), epoch.getAsInt(), nextSegmentBaseOffset) : Optional.empty();
    }

    private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
        RecordBatch nextBatch;
        while ((nextBatch = remoteLogInputStream.nextBatch()) != null && nextBatch.lastOffset() < offset) {
        }
        return nextBatch;
    }

    long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
        Optional offset = Optional.empty();
        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
        if (maybeLeaderEpochFileCache.isDefined()) {
            LeaderEpochFileCache cache = (LeaderEpochFileCache)maybeLeaderEpochFileCache.get();
            OptionalInt epoch = cache.latestEpoch();
            while (!offset.isPresent() && epoch.isPresent()) {
                offset = this.remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt());
                epoch = cache.previousEpoch(epoch.getAsInt());
            }
        }
        return offset.orElse(-1L);
    }

    long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
        Optional<Long> logStartOffset = Optional.empty();
        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
        if (maybeLeaderEpochFileCache.isDefined()) {
            LeaderEpochFileCache cache = (LeaderEpochFileCache)maybeLeaderEpochFileCache.get();
            OptionalInt earliestEpochOpt = cache.earliestEntry().map(epochEntry -> OptionalInt.of(epochEntry.epoch)).orElseGet(OptionalInt::empty);
            while (!logStartOffset.isPresent() && earliestEpochOpt.isPresent()) {
                java.util.Iterator iterator = this.remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt());
                if (iterator.hasNext()) {
                    logStartOffset = Optional.of(((RemoteLogSegmentMetadata)iterator.next()).startOffset());
                }
                earliestEpochOpt = cache.nextEpoch(earliestEpochOpt.getAsInt());
            }
        }
        return logStartOffset.orElseGet(log::localLogStartOffset);
    }

    public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
        return this.remoteStorageReaderThreadPool.submit((Callable)new RemoteLogReader(fetchInfo, this, callback, this.brokerTopicStats));
    }

    void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, Consumer<RLMTask> convertToLeaderOrFollower) {
        RLMTaskWithFuture rlmTaskWithFuture = this.leaderOrFollowerTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
            RLMTask task = new RLMTask((TopicIdPartition)topicIdPartition, this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
            convertToLeaderOrFollower.accept(task);
            LOGGER.info("Created a new task: {} and getting scheduled", (Object)task);
            ScheduledFuture<?> future = this.rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0L, this.delayInMs, TimeUnit.MILLISECONDS);
            return new RLMTaskWithFuture(task, future);
        });
        convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        RemoteLogManager remoteLogManager = this;
        synchronized (remoteLogManager) {
            if (!this.closed) {
                this.leaderOrFollowerTasks.values().forEach(RLMTaskWithFuture::cancel);
                Utils.closeQuietly((AutoCloseable)this.remoteLogStorageManager, (String)"RemoteLogStorageManager");
                Utils.closeQuietly((AutoCloseable)this.remoteLogMetadataManager, (String)"RemoteLogMetadataManager");
                Utils.closeQuietly((AutoCloseable)this.indexCache, (String)"RemoteIndexCache");
                this.rlmScheduledThreadPool.close();
                try {
                    RemoteLogManager.shutdownAndAwaitTermination((ExecutorService)this.remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10L, TimeUnit.SECONDS);
                }
                finally {
                    this.removeMetrics();
                }
                this.leaderOrFollowerTasks.clear();
                this.closed = true;
            }
        }
    }

    private static void shutdownAndAwaitTermination(ExecutorService pool, String poolName, long timeout, TimeUnit timeUnit) {
        LOGGER.info("Shutting down of thread pool {} is started", (Object)poolName);
        pool.shutdown();
        try {
            if (!pool.awaitTermination(timeout, timeUnit)) {
                LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", (Object)poolName);
                pool.shutdownNow();
                if (!pool.awaitTermination(timeout, timeUnit)) {
                    LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", (Object)poolName);
                }
            }
        }
        catch (InterruptedException ex) {
            LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", (Object)poolName);
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        LOGGER.info("Shutting down of thread pool {} is completed", (Object)poolName);
    }

    RLMTaskWithFuture task(TopicIdPartition partition) {
        return this.leaderOrFollowerTasks.get(partition);
    }

    static class EnrichedLogSegment {
        private final LogSegment logSegment;
        private final long nextSegmentOffset;

        public EnrichedLogSegment(LogSegment logSegment, long nextSegmentOffset) {
            this.logSegment = logSegment;
            this.nextSegmentOffset = nextSegmentOffset;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            EnrichedLogSegment that = (EnrichedLogSegment)o;
            return this.nextSegmentOffset == that.nextSegmentOffset && Objects.equals(this.logSegment, that.logSegment);
        }

        public int hashCode() {
            return Objects.hash(this.logSegment, this.nextSegmentOffset);
        }

        public String toString() {
            return "EnrichedLogSegment{logSegment=" + this.logSegment + ", nextSegmentOffset=" + this.nextSegmentOffset + '}';
        }
    }

    public static class RetentionTimeData {
        private final long retentionMs;
        private final long cleanupUntilMs;

        public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
            if (retentionMs < 0L) {
                throw new IllegalArgumentException("retentionMs should be non negative, but it is " + retentionMs);
            }
            if (cleanupUntilMs < 0L) {
                throw new IllegalArgumentException("cleanupUntilMs should be non negative, but it is " + cleanupUntilMs);
            }
            this.retentionMs = retentionMs;
            this.cleanupUntilMs = cleanupUntilMs;
        }
    }

    public static class RetentionSizeData {
        private final long retentionSize;
        private final long remainingBreachedSize;

        public RetentionSizeData(long retentionSize, long remainingBreachedSize) {
            if (retentionSize < 0L) {
                throw new IllegalArgumentException("retentionSize should be non negative, but it is " + retentionSize);
            }
            if (remainingBreachedSize <= 0L) {
                throw new IllegalArgumentException("remainingBreachedSize should be more than zero, but it is " + remainingBreachedSize);
            }
            this.retentionSize = retentionSize;
            this.remainingBreachedSize = remainingBreachedSize;
        }
    }

    static class RLMScheduledThreadPool {
        private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
        private final int poolSize;
        private final ScheduledThreadPoolExecutor scheduledThreadPool;

        public RLMScheduledThreadPool(int poolSize) {
            this.poolSize = poolSize;
            this.scheduledThreadPool = this.createPool();
        }

        private ScheduledThreadPoolExecutor createPool() {
            ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(this.poolSize);
            threadPool.setRemoveOnCancelPolicy(true);
            threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            threadPool.setThreadFactory(new ThreadFactory(){
                private final AtomicInteger sequence = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    return KafkaThread.daemon((String)("kafka-rlm-thread-pool-" + this.sequence.incrementAndGet()), (Runnable)r);
                }
            });
            return threadPool;
        }

        public Double getIdlePercent() {
            return 1.0 - (double)this.scheduledThreadPool.getActiveCount() / (double)this.scheduledThreadPool.getCorePoolSize();
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
            LOGGER.info("Scheduling runnable {} with initial delay: {}, fixed delay: {}", new Object[]{runnable, initialDelay, delay});
            return this.scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit);
        }

        public void close() {
            RemoteLogManager.shutdownAndAwaitTermination(this.scheduledThreadPool, "RLMScheduledThreadPool", 10L, TimeUnit.SECONDS);
        }
    }

    static class RLMTaskWithFuture {
        private final RLMTask rlmTask;
        private final Future<?> future;

        RLMTaskWithFuture(RLMTask rlmTask, Future<?> future) {
            this.rlmTask = rlmTask;
            this.future = future;
        }

        public void cancel() {
            this.rlmTask.cancel();
            try {
                this.future.cancel(true);
            }
            catch (Exception ex) {
                LOGGER.error("Error occurred while canceling the task: {}", (Object)this.rlmTask, (Object)ex);
            }
        }
    }

    class RLMTask
    extends CancellableRunnable {
        private final TopicIdPartition topicIdPartition;
        private final int customMetadataSizeLimit;
        private final Logger logger;
        private volatile int leaderEpoch;
        private volatile OptionalLong copiedOffsetOption;
        private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader;

        public RLMTask(TopicIdPartition topicIdPartition, int customMetadataSizeLimit) {
            this.leaderEpoch = -1;
            this.copiedOffsetOption = OptionalLong.empty();
            this.isLogStartOffsetUpdatedOnBecomingLeader = false;
            this.topicIdPartition = topicIdPartition;
            this.customMetadataSizeLimit = customMetadataSizeLimit;
            LogContext logContext = new LogContext("[RemoteLogManager=" + RemoteLogManager.this.brokerId + " partition=" + topicIdPartition + "] ");
            this.logger = logContext.logger(RLMTask.class);
        }

        boolean isLeader() {
            return this.leaderEpoch >= 0;
        }

        public void convertToLeader(int leaderEpochVal) {
            if (leaderEpochVal < 0) {
                throw new KafkaException("leaderEpoch value for topic partition " + this.topicIdPartition + " can not be negative");
            }
            if (this.leaderEpoch != leaderEpochVal) {
                this.leaderEpoch = leaderEpochVal;
            }
            this.copiedOffsetOption = OptionalLong.empty();
            this.isLogStartOffsetUpdatedOnBecomingLeader = false;
        }

        public void convertToFollower() {
            this.leaderEpoch = -1;
        }

        private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log) throws RemoteStorageException {
            if (!this.isLogStartOffsetUpdatedOnBecomingLeader) {
                long logStartOffset = RemoteLogManager.this.findLogStartOffset(this.topicIdPartition, log);
                RemoteLogManager.this.updateRemoteLogStartOffset.accept(this.topicIdPartition.topicPartition(), logStartOffset);
                this.isLogStartOffsetUpdatedOnBecomingLeader = true;
                this.logger.info("Found the logStartOffset: {} for partition: {} after becoming leader, leaderEpoch: {}", new Object[]{logStartOffset, this.topicIdPartition, this.leaderEpoch});
            }
        }

        private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageException {
            if (!this.copiedOffsetOption.isPresent()) {
                this.copiedOffsetOption = OptionalLong.of(RemoteLogManager.this.findHighestRemoteOffset(this.topicIdPartition, log));
                this.logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, leaderEpoch: {}", new Object[]{this.copiedOffsetOption, this.topicIdPartition, this.leaderEpoch});
            }
        }

        List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long fromOffset, Long lastStableOffset) {
            ArrayList<EnrichedLogSegment> candidateLogSegments = new ArrayList<EnrichedLogSegment>();
            List segments = JavaConverters.seqAsJavaList((Seq)log.logSegments(fromOffset, Long.MAX_VALUE).toSeq());
            if (!segments.isEmpty()) {
                for (int idx = 1; idx < segments.size(); ++idx) {
                    LogSegment previousSeg = (LogSegment)segments.get(idx - 1);
                    LogSegment currentSeg = (LogSegment)segments.get(idx);
                    if (currentSeg.baseOffset() > lastStableOffset) continue;
                    candidateLogSegments.add(new EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
                }
            }
            return candidateLogSegments;
        }

        public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException {
            block13: {
                if (this.isCancelled()) {
                    return;
                }
                try {
                    this.maybeUpdateLogStartOffsetOnBecomingLeader(log);
                    this.maybeUpdateCopiedOffset(log);
                    long copiedOffset = this.copiedOffsetOption.getAsLong();
                    long lso = log.lastStableOffset();
                    if (lso < 0L) {
                        this.logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", (Object)this.topicIdPartition, (Object)lso);
                    } else if (lso > 0L && copiedOffset < lso) {
                        long fromOffset = Math.max(copiedOffset + 1L, log.logStartOffset());
                        List<EnrichedLogSegment> candidateLogSegments = this.candidateLogSegments(log, fromOffset, lso);
                        this.logger.debug("Candidate log segments, logStartOffset: {}, copiedOffset: {}, fromOffset: {}, lso: {} and candidateLogSegments: {}", new Object[]{log.logStartOffset(), copiedOffset, fromOffset, lso, candidateLogSegments});
                        if (candidateLogSegments.isEmpty()) {
                            this.logger.debug("No segments found to be copied for partition {} with copiedOffset: {} and active segment's base-offset: {}", new Object[]{this.topicIdPartition, copiedOffset, log.activeSegment().baseOffset()});
                        } else {
                            for (EnrichedLogSegment candidateLogSegment : candidateLogSegments) {
                                if (this.isCancelled() || !this.isLeader()) {
                                    this.logger.info("Skipping copying log segments as the current task state is changed, cancelled: {} leader:{}", (Object)this.isCancelled(), (Object)this.isLeader());
                                    return;
                                }
                                this.copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
                            }
                        }
                    } else {
                        this.logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", (Object)copiedOffset, (Object)lso);
                    }
                }
                catch (CustomMetadataSizeLimitExceededException e) {
                    RemoteLogManager.this.brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                    RemoteLogManager.this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                    this.cancel();
                }
                catch (InterruptedException | RetriableException ex) {
                    throw ex;
                }
                catch (Exception ex) {
                    if (this.isCancelled()) break block13;
                    RemoteLogManager.this.brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                    RemoteLogManager.this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                    this.logger.error("Error occurred while copying log segments of partition: {}", (Object)this.topicIdPartition, (Object)ex);
                }
            }
        }

        private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException, CustomMetadataSizeLimitExceededException {
            long customMetadataSize;
            File logFile = segment.log().file();
            String logFileName = logFile.getName();
            this.logger.info("Copying {} to remote storage.", (Object)logFileName);
            RemoteLogSegmentId id = RemoteLogSegmentId.generateNew((TopicIdPartition)this.topicIdPartition);
            long endOffset = nextSegmentBaseOffset - 1L;
            File producerStateSnapshotFile = log.producerStateManager().fetchSnapshot(nextSegmentBaseOffset).orElse(null);
            List epochEntries = RemoteLogManager.this.getLeaderEpochCheckpoint(log, segment.baseOffset(), nextSegmentBaseOffset).read();
            HashMap segmentLeaderEpochs = new HashMap(epochEntries.size());
            epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, entry.startOffset));
            RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata(id, segment.baseOffset(), endOffset, segment.largestTimestamp(), RemoteLogManager.this.brokerId, RemoteLogManager.this.time.milliseconds(), segment.log().sizeInBytes(), segmentLeaderEpochs);
            RemoteLogManager.this.remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
            ByteBuffer leaderEpochsIndex = RemoteLogManager.this.getLeaderEpochCheckpoint(log, -1L, nextSegmentBaseOffset).readAsByteBuffer();
            LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), this.toPathIfExists(((OffsetIndex)segment.lazyOffsetIndex().get()).file()), this.toPathIfExists(((TimeIndex)segment.lazyTimeIndex().get()).file()), Optional.ofNullable(this.toPathIfExists(segment.txnIndex().file())), producerStateSnapshotFile.toPath(), leaderEpochsIndex);
            RemoteLogManager.this.brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
            RemoteLogManager.this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
            Optional customMetadata = RemoteLogManager.this.remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
            RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, RemoteLogManager.this.time.milliseconds(), customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, RemoteLogManager.this.brokerId);
            if (customMetadata.isPresent() && (customMetadataSize = (long)((RemoteLogSegmentMetadata.CustomMetadata)customMetadata.get()).value().length) > (long)this.customMetadataSizeLimit) {
                CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException();
                this.logger.error("Custom metadata size {} exceeds configured limit {}. Copying will be stopped and copied segment will be attempted to clean. Original metadata: {}", new Object[]{customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e});
                try {
                    RemoteLogManager.this.remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));
                    this.logger.info("Successfully cleaned segment after custom metadata size exceeded");
                }
                catch (RemoteStorageException e1) {
                    this.logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", (Throwable)e1);
                }
                throw e;
            }
            RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
            RemoteLogManager.this.brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyBytesRate().mark((long)copySegmentStartedRlsm.segmentSizeInBytes());
            RemoteLogManager.this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark((long)copySegmentStartedRlsm.segmentSizeInBytes());
            this.copiedOffsetOption = OptionalLong.of(endOffset);
            log.updateHighestOffsetInRemoteStorage(endOffset);
            this.logger.info("Copied {} to remote storage with segment-id: {}", (Object)logFileName, (Object)copySegmentFinishedRlsm.remoteLogSegmentId());
        }

        private Path toPathIfExists(File file) {
            return file.exists() ? file.toPath() : null;
        }

        @Override
        public void run() {
            block9: {
                if (this.isCancelled()) {
                    return;
                }
                try {
                    Optional unifiedLogOptional = (Optional)RemoteLogManager.this.fetchLog.apply(this.topicIdPartition.topicPartition());
                    if (!unifiedLogOptional.isPresent()) {
                        return;
                    }
                    UnifiedLog log = (UnifiedLog)unifiedLogOptional.get();
                    if (this.isLeader()) {
                        this.copyLogSegmentsToRemote(log);
                        this.cleanupExpiredRemoteLogSegments();
                    } else {
                        long offset = RemoteLogManager.this.findHighestRemoteOffset(this.topicIdPartition, log);
                        log.updateHighestOffsetInRemoteStorage(offset);
                    }
                }
                catch (InterruptedException ex) {
                    if (!this.isCancelled()) {
                        this.logger.warn("Current thread for topic-partition-id {} is interrupted. Reason: {}", (Object)this.topicIdPartition, (Object)ex.getMessage());
                    }
                }
                catch (RetriableException ex) {
                    this.logger.debug("Encountered a retryable error while executing current task for topic-partition {}", (Object)this.topicIdPartition, (Object)ex);
                }
                catch (Exception ex) {
                    if (this.isCancelled()) break block9;
                    this.logger.warn("Current task for topic-partition {} received error but it will be scheduled. Reason: {}", (Object)this.topicIdPartition, (Object)ex.getMessage());
                }
            }
        }

        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
            if (this.isLeader()) {
                this.logger.debug("Updating {} with remoteLogStartOffset: {}", (Object)topicPartition, (Object)remoteLogStartOffset);
                RemoteLogManager.this.updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
            }
        }

        private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
            if (this.isCancelled() || !this.isLeader()) {
                this.logger.info("Returning from remote log segments cleanup as the task state is changed");
                return;
            }
            java.util.Iterator segmentMetadataIter = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition);
            if (!segmentMetadataIter.hasNext()) {
                this.logger.debug("No remote log segments available on remote storage for partition: {}", (Object)this.topicIdPartition);
                return;
            }
            Optional logOptional = (Optional)RemoteLogManager.this.fetchLog.apply(this.topicIdPartition.topicPartition());
            if (!logOptional.isPresent()) {
                this.logger.debug("No UnifiedLog instance available for partition: {}", (Object)this.topicIdPartition);
                return;
            }
            UnifiedLog log = (UnifiedLog)logOptional.get();
            Option<LeaderEpochFileCache> leaderEpochCacheOption = log.leaderEpochCache();
            if (leaderEpochCacheOption.isEmpty()) {
                this.logger.debug("No leader epoch cache available for partition: {}", (Object)this.topicIdPartition);
                return;
            }
            HashSet epochsSet = new HashSet();
            while (segmentMetadataIter.hasNext()) {
                RemoteLogSegmentMetadata segmentMetadata = (RemoteLogSegmentMetadata)segmentMetadataIter.next();
                epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
            }
            ArrayList remoteLeaderEpochs = new ArrayList(epochsSet);
            Collections.sort(remoteLeaderEpochs);
            LeaderEpochFileCache leaderEpochCache = (LeaderEpochFileCache)leaderEpochCacheOption.get();
            NavigableMap<Integer, Long> epochWithOffsets = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
            long logStartOffset = log.logStartOffset();
            long logEndOffset = log.logEndOffset();
            Optional<RetentionSizeData> retentionSizeData = this.buildRetentionSizeData(log.config().retentionSize, log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets);
            Optional<RetentionTimeData> retentionTimeData = this.buildRetentionTimeData(log.config().retentionMs);
            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
            java.util.Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
            boolean canProcess = true;
            ArrayList<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<RemoteLogSegmentMetadata>();
            while (canProcess && epochIterator.hasNext()) {
                Integer epoch = epochIterator.next();
                java.util.Iterator segmentsIterator = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, epoch.intValue());
                while (canProcess && segmentsIterator.hasNext()) {
                    if (this.isCancelled() || !this.isLeader()) {
                        this.logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed.");
                        return;
                    }
                    RemoteLogSegmentMetadata metadata = (RemoteLogSegmentMetadata)segmentsIterator.next();
                    boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, logStartOffset, epochWithOffsets);
                    boolean isValidSegment = false;
                    if (!shouldDeleteSegment && (isValidSegment = RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets))) {
                        boolean bl = shouldDeleteSegment = remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
                    }
                    if (shouldDeleteSegment) {
                        segmentsToDelete.add(metadata);
                    }
                    canProcess = shouldDeleteSegment || !isValidSegment;
                }
            }
            Optional earliestEpochEntryOptional = leaderEpochCache.earliestEntry();
            if (earliestEpochEntryOptional.isPresent()) {
                EpochEntry earliestEpochEntry = (EpochEntry)earliestEpochEntryOptional.get();
                java.util.Iterator epochsToClean = remoteLeaderEpochs.stream().filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch).iterator();
                while (epochsToClean.hasNext()) {
                    int epoch = (Integer)epochsToClean.next();
                    java.util.Iterator segmentsToBeCleaned = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, epoch);
                    while (segmentsToBeCleaned.hasNext()) {
                        if (this.isCancelled() || !this.isLeader()) {
                            return;
                        }
                        remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry, (RemoteLogSegmentMetadata)segmentsToBeCleaned.next());
                    }
                }
            }
            remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> this.handleLogStartOffsetUpdate(this.topicIdPartition.topicPartition(), offset));
            ArrayList<String> undeletedSegments = new ArrayList<String>();
            for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
                if (remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !this.isCancelled() && this.isLeader())) continue;
                undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
            }
            if (!undeletedSegments.isEmpty()) {
                this.logger.info("The following remote segments could not be deleted: {}", (Object)String.join((CharSequence)",", undeletedSegments));
            }
        }

        private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
            return retentionMs > -1L ? Optional.of(new RetentionTimeData(retentionMs, RemoteLogManager.this.time.milliseconds() - retentionMs)) : Optional.empty();
        }

        private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize, long onlyLocalLogSegmentsSize, long logEndOffset, NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
            if (retentionSize > -1L) {
                long remoteLogSizeBytes = 0L;
                HashSet<RemoteLogSegmentId> visitedSegmentIds = new HashSet<RemoteLogSegmentId>();
                for (Integer epoch : epochEntries.navigableKeySet()) {
                    java.util.Iterator segmentsIterator = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, epoch.intValue());
                    while (segmentsIterator.hasNext()) {
                        RemoteLogSegmentMetadata segmentMetadata = (RemoteLogSegmentMetadata)segmentsIterator.next();
                        RemoteLogSegmentId segmentId = segmentMetadata.remoteLogSegmentId();
                        if (visitedSegmentIds.contains(segmentId) || !RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries)) continue;
                        remoteLogSizeBytes += (long)segmentMetadata.segmentSizeInBytes();
                        visitedSegmentIds.add(segmentId);
                    }
                }
                long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
                if (totalSize > retentionSize) {
                    long remainingBreachedSize = totalSize - retentionSize;
                    RetentionSizeData retentionSizeData = new RetentionSizeData(retentionSize, remainingBreachedSize);
                    return Optional.of(retentionSizeData);
                }
            }
            return Optional.empty();
        }

        public String toString() {
            return this.getClass().toString() + "[" + this.topicIdPartition + "]";
        }

        class RemoteLogRetentionHandler {
            private final Optional<RetentionSizeData> retentionSizeData;
            private final Optional<RetentionTimeData> retentionTimeData;
            private long remainingBreachedSize;
            private OptionalLong logStartOffset = OptionalLong.empty();

            public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
                this.retentionSizeData = retentionSizeData;
                this.retentionTimeData = retentionTimeData;
                this.remainingBreachedSize = retentionSizeData.map(sizeData -> ((RetentionSizeData)sizeData).remainingBreachedSize).orElse(0L);
            }

            private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                long remainingBytes;
                if (!this.retentionSizeData.isPresent()) {
                    return false;
                }
                boolean shouldDeleteSegment = false;
                if (this.remainingBreachedSize > 0L && (remainingBytes = this.remainingBreachedSize - (long)metadata.segmentSizeInBytes()) >= 0L) {
                    this.remainingBreachedSize = remainingBytes;
                    shouldDeleteSegment = true;
                }
                if (shouldDeleteSegment) {
                    this.logStartOffset = OptionalLong.of(metadata.endOffset() + 1L);
                    RLMTask.this.logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", new Object[]{metadata.remoteLogSegmentId(), this.retentionSizeData.get().retentionSize, this.remainingBreachedSize + this.retentionSizeData.get().retentionSize});
                }
                return shouldDeleteSegment;
            }

            public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
                boolean shouldDeleteSegment;
                if (!this.retentionTimeData.isPresent()) {
                    return false;
                }
                boolean bl = shouldDeleteSegment = metadata.maxTimestampMs() <= this.retentionTimeData.get().cleanupUntilMs;
                if (shouldDeleteSegment) {
                    this.remainingBreachedSize = Math.max(0L, this.remainingBreachedSize - (long)metadata.segmentSizeInBytes());
                    this.logStartOffset = OptionalLong.of(metadata.endOffset() + 1L);
                    RLMTask.this.logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", (Object)metadata.remoteLogSegmentId(), (Object)this.retentionTimeData.get().retentionMs);
                }
                return shouldDeleteSegment;
            }

            private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long logStartOffset, NavigableMap<Integer, Long> leaderEpochEntries) {
                boolean shouldDeleteSegment = false;
                if (!leaderEpochEntries.isEmpty()) {
                    Integer firstEpoch = (Integer)leaderEpochEntries.firstKey();
                    boolean bl = shouldDeleteSegment = metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch) && metadata.endOffset() < logStartOffset;
                }
                if (shouldDeleteSegment) {
                    RLMTask.this.logger.info("About to delete remote log segment {} due to log-start-offset {} breach. Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}", new Object[]{metadata.remoteLogSegmentId(), logStartOffset, leaderEpochEntries.firstEntry(), metadata.endOffset(), metadata.segmentLeaderEpochs()});
                }
                return shouldDeleteSegment;
            }

            private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
                boolean isSegmentDeleted = this.deleteRemoteLogSegment(metadata, ignored -> metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
                if (isSegmentDeleted) {
                    RLMTask.this.logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}", new Object[]{metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet()});
                }
                return isSegmentDeleted;
            }

            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) throws RemoteStorageException, ExecutionException, InterruptedException {
                if (predicate.test(segmentMetadata)) {
                    RLMTask.this.logger.debug("Deleting remote log segment {}", (Object)segmentMetadata.remoteLogSegmentId());
                    RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), RemoteLogManager.this.time.milliseconds(), segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, RemoteLogManager.this.brokerId)).get();
                    RemoteLogManager.this.remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
                    RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), RemoteLogManager.this.time.milliseconds(), segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, RemoteLogManager.this.brokerId)).get();
                    RLMTask.this.logger.debug("Deleted remote log segment {}", (Object)segmentMetadata.remoteLogSegmentId());
                    return true;
                }
                return false;
            }
        }
    }

    private static abstract class CancellableRunnable
    implements Runnable {
        private volatile boolean cancelled = false;

        private CancellableRunnable() {
        }

        public void cancel() {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }
    }
}

