package com.taobao.common.store.journal;

import ch.qos.logback.core.CoreConstants;
import com.taobao.common.store.Store;
import com.taobao.common.store.journal.impl.ConcurrentIndexMap;
import com.taobao.common.store.journal.impl.LevelLRUIndexMap;
import com.taobao.common.store.util.BytesKey;
import com.taobao.common.store.util.Util;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/notify-store4j-5.0.4.jar:com/taobao/common/store/journal/JournalStore.class */
public class JournalStore implements Store, JournalStoreMBean {
    static Logger log = LoggerFactory.getLogger((Class<?>) JournalStore.class);
    public static final int FILE_SIZE = 67108864;
    public static final int HALF_DAY = 43200000;
    protected static final int DEFAULT_MAX_BATCH_SIZE = 4194304;
    private final String path;
    private final String name;
    private final boolean force;
    protected IndexMap indices;
    private final Map<BytesKey, Long> lastModifiedMap;
    public Map<Integer, DataFile> dataFiles;
    protected Map<Integer, LogFile> logFiles;
    private ConcurrentHashMap<BytesKey, JournalLocation> resumeRecode;
    protected DataFile dataFile;
    protected LogFile logFile;
    private DataFileAppender dataFileAppender;
    private final AtomicInteger number;
    private long intervalForCompact;
    private long intervalForRemove;
    private volatile ScheduledExecutorService scheduledPool;
    private volatile long maxFileCount;
    protected int maxWriteBatchSize;
    private Checkpoint checkpoint;

    /* loaded from: input_file:lib/notify-store4j-5.0.4.jar:com/taobao/common/store/journal/JournalStore$DataFileCheckThread.class */
    class DataFileCheckThread implements Runnable {
        DataFileCheckThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                JournalStore.this.check();
            } catch (Exception e) {
                JournalStore.log.warn("check error:", (Throwable) e);
            }
        }
    }

    /* loaded from: input_file:lib/notify-store4j-5.0.4.jar:com/taobao/common/store/journal/JournalStore$InflyWriteData.class */
    public static class InflyWriteData {
        public volatile byte[] data;
        public volatile int count = 1;

        public InflyWriteData(byte[] bArr) {
            this.data = bArr;
        }
    }

    public JournalStore(String str, String str2, boolean z, boolean z2) throws IOException, InterruptedException {
        this(str, str2, null, z, z2, false);
    }

    public JournalStore(String str, String str2, IndexMap indexMap, boolean z, boolean z2) throws IOException, InterruptedException {
        this(str, str2, indexMap, z, z2, false);
    }

    public JournalStore(String str, String str2, boolean z, boolean z2, boolean z3) throws IOException, InterruptedException {
        this(str, str2, null, z, z2, z3);
    }

    public JournalStore(String str, String str2, IndexMap indexMap, boolean z, boolean z2, boolean z3) throws IOException, InterruptedException {
        this.lastModifiedMap = new ConcurrentHashMap();
        this.dataFiles = new ConcurrentHashMap();
        this.logFiles = new ConcurrentHashMap();
        this.resumeRecode = new ConcurrentHashMap<>();
        this.dataFile = null;
        this.logFile = null;
        this.dataFileAppender = null;
        this.number = new AtomicInteger(0);
        this.intervalForCompact = 43200000L;
        this.intervalForRemove = CoreConstants.MILLIS_IN_ONE_WEEK;
        this.maxFileCount = Long.MAX_VALUE;
        this.maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
        Util.registMBean(this, str2);
        this.path = str;
        this.name = str2;
        this.force = z;
        if (indexMap != null) {
            this.indices = indexMap;
        } else if (z2) {
            int maxMemory = (int) ((Runtime.getRuntime().maxMemory() / 40) / 150);
            this.indices = new LevelLRUIndexMap(maxMemory > 200000 ? 200000 : maxMemory, str, str2, z2);
        } else {
            this.indices = new ConcurrentIndexMap();
        }
        this.dataFileAppender = new DataFileAppender(this);
        try {
            this.checkpoint = new Checkpoint(this.path, this.name, this.resumeRecode);
        } catch (Exception e) {
            log.error("初始化checkPoint 失败!", (Throwable) e);
        }
        initLoad();
        if (null == this.dataFile || null == this.logFile) {
            newDataFile();
        }
        if (z3) {
            this.scheduledPool = Executors.newSingleThreadScheduledExecutor();
            this.scheduledPool.scheduleAtFixedRate(new DataFileCheckThread(), calcDelay(), 43200000L, TimeUnit.MILLISECONDS);
            log.warn("启动数据文件定时整理线程");
        }
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.taobao.common.store.journal.JournalStore.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    JournalStore.this.close();
                } catch (IOException e2) {
                    JournalStore.log.error("close error", (Throwable) e2);
                }
            }
        });
    }

    public JournalStore(String str, String str2) throws IOException, InterruptedException {
        this(str, str2, false, false);
    }

    @Override // com.taobao.common.store.Store
    public void add(byte[] bArr, byte[] bArr2) throws IOException, InterruptedException {
        add(bArr, bArr2, false);
    }

    @Override // com.taobao.common.store.Store
    public void add(byte[] bArr, byte[] bArr2, boolean z) throws IOException, InterruptedException {
        checkParam(bArr, bArr2);
        innerAdd(bArr, bArr2, -1L, z);
    }

    @Override // com.taobao.common.store.Store
    public boolean remove(byte[] bArr, boolean z) throws IOException, InterruptedException {
        return innerRemove(bArr, z);
    }

    private void reuse(byte[] bArr, boolean z) throws IOException, InterruptedException {
        byte[] bArr2 = get(bArr);
        long longValue = this.lastModifiedMap.get(new BytesKey(bArr)).longValue();
        if (bArr2 == null || !remove(bArr)) {
            return;
        }
        innerAdd(bArr, bArr2, longValue, z);
    }

    private long calcDelay() {
        GregorianCalendar gregorianCalendar = new GregorianCalendar();
        gregorianCalendar.setTime(new Date());
        long time = gregorianCalendar.getTime().getTime();
        gregorianCalendar.set(11, 6);
        gregorianCalendar.set(12, 0);
        gregorianCalendar.set(13, 0);
        long time2 = gregorianCalendar.getTime().getTime() - time;
        if (time2 < 0) {
            gregorianCalendar.set(11, 18);
            gregorianCalendar.set(12, 0);
            gregorianCalendar.set(13, 0);
            time2 = gregorianCalendar.getTime().getTime() - time;
            if (time2 < 0) {
                time2 += 43200000;
            }
        }
        return time2;
    }

    private OpItem innerAdd(byte[] bArr, byte[] bArr2, long j, boolean z) throws IOException, InterruptedException {
        BytesKey bytesKey = new BytesKey(bArr);
        OpItem opItem = new OpItem();
        opItem.op = (byte) 1;
        this.dataFileAppender.store(opItem, bytesKey, bArr2, z);
        this.indices.put(bytesKey, opItem);
        if (j == -1) {
            this.lastModifiedMap.put(bytesKey, Long.valueOf(System.currentTimeMillis()));
        } else {
            this.lastModifiedMap.put(bytesKey, Long.valueOf(j));
        }
        this.resumeRecode.put(new BytesKey(bArr), new JournalLocation(opItem.number, opItem.offset));
        return opItem;
    }

    @Override // com.taobao.common.store.Store
    public byte[] get(byte[] bArr) throws IOException {
        BytesKey bytesKey = new BytesKey(bArr);
        byte[] dataFromInFlyWrites = this.dataFileAppender.getDataFromInFlyWrites(bytesKey);
        if (dataFromInFlyWrites != null) {
            return dataFromInFlyWrites;
        }
        OpItem opItem = this.indices.get(bytesKey);
        if (null != opItem) {
            DataFile dataFile = this.dataFiles.get(Integer.valueOf(opItem.number));
            if (null != dataFile) {
                ByteBuffer wrap = ByteBuffer.wrap(new byte[opItem.length]);
                dataFile.read(wrap, opItem.offset);
                dataFromInFlyWrites = wrap.array();
            } else {
                log.warn("数据文件丢失：" + opItem);
                this.indices.updateToRemove(bytesKey, opItem, true);
                this.lastModifiedMap.remove(bytesKey);
            }
        }
        return dataFromInFlyWrites;
    }

    @Override // com.taobao.common.store.Store
    public Iterator<byte[]> iterator() throws IOException {
        final Iterator<BytesKey> keyIterator = this.indices.keyIterator();
        return new Iterator<byte[]>() { // from class: com.taobao.common.store.journal.JournalStore.2
            @Override // java.util.Iterator
            public boolean hasNext() {
                return keyIterator.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public byte[] next() {
                BytesKey bytesKey = (BytesKey) keyIterator.next();
                if (null != bytesKey) {
                    return bytesKey.getData();
                }
                return null;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException("不支持删除，请直接调用store.remove方法");
            }
        };
    }

    @Override // com.taobao.common.store.Store
    public boolean remove(byte[] bArr) throws IOException, InterruptedException {
        return remove(bArr, false);
    }

    private boolean innerRemove(byte[] bArr, boolean z) throws IOException, InterruptedException {
        boolean z2 = false;
        BytesKey bytesKey = new BytesKey(bArr);
        OpItem opItem = this.indices.get(bytesKey);
        if (null != opItem) {
            z2 = innerRemove(opItem, bytesKey, z);
            if (z2) {
                this.indices.remove(bytesKey);
                this.lastModifiedMap.remove(bytesKey);
            }
        }
        return z2;
    }

    private boolean innerRemove(OpItem opItem, BytesKey bytesKey, boolean z) throws IOException, InterruptedException {
        DataFile dataFile = this.dataFiles.get(Integer.valueOf(opItem.number));
        LogFile logFile = this.logFiles.get(Integer.valueOf(opItem.number));
        if (null == dataFile || null == logFile) {
            return false;
        }
        OpItem opItem2 = new OpItem();
        opItem2.key = opItem.key;
        opItem2.length = opItem.length;
        opItem2.number = opItem.number;
        opItem2.offset = opItem.offset;
        opItem2.op = (byte) 2;
        this.dataFileAppender.remove(opItem2, bytesKey, z);
        this.resumeRecode.remove(bytesKey);
        return true;
    }

    private void checkParam(byte[] bArr, byte[] bArr2) {
        if (null == bArr || null == bArr2) {
            throw new NullPointerException("key/data can't be null");
        }
        if (bArr.length != 16) {
            throw new IllegalArgumentException("key.length must be 16");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DataFile newDataFile() throws IOException {
        if (this.dataFiles.size() > this.maxFileCount) {
            throw new RuntimeException("最多只能存储" + this.maxFileCount + "个数据文件");
        }
        int incrementAndGet = this.number.incrementAndGet();
        this.dataFile = new DataFile(new File(this.path + File.separator + this.name + "." + incrementAndGet), incrementAndGet, this.force);
        this.logFile = new LogFile(new File(this.path + File.separator + this.name + "." + incrementAndGet + ".log"), incrementAndGet, this.force);
        this.dataFiles.put(Integer.valueOf(incrementAndGet), this.dataFile);
        this.logFiles.put(Integer.valueOf(incrementAndGet), this.logFile);
        log.info("生成新文件：" + this.dataFile);
        return this.dataFile;
    }

    private void checkParentDir(File file) {
        if (!file.exists() && !file.mkdirs()) {
            throw new IllegalStateException("Can't make dir " + this.path);
        }
    }

    private void initLoad() throws IOException, InterruptedException {
        log.warn("开始恢复数据");
        final String str = this.name + ".";
        File file = new File(this.path);
        checkParentDir(file);
        File[] listFiles = file.listFiles(new FilenameFilter() { // from class: com.taobao.common.store.journal.JournalStore.3
            @Override // java.io.FilenameFilter
            public boolean accept(File file2, String str2) {
                return str2.startsWith(str) && !str2.endsWith(".log");
            }
        });
        if (listFiles == null || listFiles.length == 0) {
            return;
        }
        log.warn("遍历每个数据文件");
        LinkedList linkedList = new LinkedList();
        for (File file2 : listFiles) {
            try {
                linkedList.add(Integer.valueOf(Integer.parseInt(file2.getName().substring(str.length()))));
            } catch (Exception e) {
                log.error("parse file index error" + file2, (Throwable) e);
            }
        }
        Integer[] numArr = (Integer[]) linkedList.toArray(new Integer[linkedList.size()]);
        Arrays.sort(numArr);
        JournalLocation journalLocation = this.checkpoint.getJournalLocation();
        if (journalLocation == null) {
            journalLocation = new JournalLocation(0, 0L);
        }
        if (journalLocation.getNumber() > numArr[numArr.length - 1].intValue()) {
            journalLocation = new JournalLocation(0, 0L);
        }
        for (Integer num : numArr) {
            log.warn("处理index为" + num + "的文件");
            HashMap hashMap = new HashMap();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            File file3 = new File(file, this.name + "." + num);
            DataFile dataFile = new DataFile(file3, num.intValue(), this.force);
            LogFile logFile = new LogFile(new File(file3.getAbsolutePath() + ".log"), num.intValue(), this.force);
            long length = logFile.getLength() / 33;
            if (num.intValue() >= journalLocation.getNumber()) {
                long j = 0;
                while (true) {
                    long j2 = j;
                    if (j2 < length) {
                        ByteBuffer wrap = ByteBuffer.wrap(new byte[33]);
                        logFile.read(wrap, j2 * 33);
                        if (!wrap.hasRemaining()) {
                            OpItem opItem = new OpItem();
                            opItem.parse(wrap.array());
                            BytesKey bytesKey = new BytesKey(opItem.key);
                            JournalLocation journalLocation2 = new JournalLocation(opItem.getNumber(), opItem.getOffset());
                            switch (opItem.op) {
                                case 1:
                                    OpItem opItem2 = this.indices.get(bytesKey);
                                    if (null != opItem2) {
                                        innerRemove(opItem2, bytesKey, true);
                                        this.indices.remove(bytesKey);
                                        this.lastModifiedMap.remove(bytesKey);
                                    }
                                    boolean z = hashMap.get(bytesKey) == null;
                                    hashMap.put(bytesKey, opItem);
                                    concurrentHashMap.put(bytesKey, journalLocation2);
                                    if (z) {
                                        dataFile.increment();
                                        break;
                                    } else {
                                        break;
                                    }
                                case 2:
                                    hashMap.remove(bytesKey);
                                    concurrentHashMap.remove(bytesKey);
                                    dataFile.decrement();
                                    break;
                                default:
                                    log.warn("unknow op:" + ((int) opItem.op));
                                    break;
                            }
                        } else {
                            log.warn("log file error:" + logFile + ", index:" + j2);
                        }
                        j = j2 + 1;
                    }
                }
            } else {
                log.warn("根据checkpoint 跳过 索引为 " + num + " 的文件加载检查！");
            }
            if (dataFile.getLength() < 67108864 || !dataFile.isUnUsed()) {
                this.dataFiles.put(num, dataFile);
                this.logFiles.put(num, logFile);
                if (!dataFile.isUnUsed()) {
                    this.indices.putAll(hashMap);
                    this.resumeRecode.putAll(concurrentHashMap);
                    long lastModified = logFile.lastModified();
                    Iterator it = hashMap.keySet().iterator();
                    while (it.hasNext()) {
                        this.lastModifiedMap.put((BytesKey) it.next(), Long.valueOf(lastModified));
                    }
                    log.warn("还在使用，放入索引，referenceCount:" + dataFile.getReferenceCount() + ", index:" + hashMap.size());
                }
            } else {
                dataFile.delete();
                logFile.delete();
                log.warn("不用了，也超过了大小，删除");
            }
        }
        if (this.dataFiles.size() > 0) {
            Integer[] numArr2 = (Integer[]) this.dataFiles.keySet().toArray(new Integer[this.dataFiles.keySet().size()]);
            Arrays.sort(numArr2);
            for (int i = 0; i < numArr2.length - 1; i++) {
                DataFile dataFile2 = this.dataFiles.get(numArr2[i]);
                if (dataFile2.isUnUsed() || dataFile2.getLength() < 67108864) {
                    throw new IllegalStateException("非当前文件的状态是大于等于文件块长度，并且是used状态");
                }
            }
            Integer num2 = numArr2[numArr2.length - 1];
            this.number.set(num2.intValue());
            this.dataFile = this.dataFiles.get(num2);
            this.logFile = this.logFiles.get(num2);
        }
        log.warn("恢复数据：" + size());
    }

    @Override // com.taobao.common.store.Store
    public int size() {
        return this.indices.size();
    }

    @Override // com.taobao.common.store.Store
    public boolean update(byte[] bArr, byte[] bArr2) throws IOException, InterruptedException {
        BytesKey bytesKey = new BytesKey(bArr);
        OpItem opItem = this.indices.get(bytesKey);
        if (null == opItem) {
            return false;
        }
        this.indices.updateToRemove(bytesKey, opItem, false);
        if (innerAdd(bArr, bArr2, -1L, false).number != opItem.number) {
            innerRemove(opItem, bytesKey, false);
            return true;
        }
        this.dataFiles.get(Integer.valueOf(opItem.number)).decrement();
        return true;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getDataFilesInfo() {
        return this.dataFiles.toString();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getLogFilesInfo() {
        return this.logFiles.toString();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public int getNumber() {
        return this.number.get();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getPath() {
        return this.path;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getName() {
        return this.name;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getDataFileInfo() {
        return this.dataFile.toString();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String getLogFileInfo() {
        return this.logFile.toString();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public String viewIndexMap() {
        return this.indices.toString();
    }

    @Override // com.taobao.common.store.Store
    public void close() throws IOException {
        sync();
        for (DataFile dataFile : this.dataFiles.values()) {
            try {
                dataFile.close();
            } catch (Exception e) {
                log.warn("close error:" + dataFile, (Throwable) e);
            }
        }
        this.dataFiles.clear();
        for (LogFile logFile : this.logFiles.values()) {
            try {
                logFile.close();
            } catch (Exception e2) {
                log.warn("close error:" + logFile, (Throwable) e2);
            }
        }
        this.logFiles.clear();
        this.indices.close();
        this.lastModifiedMap.clear();
        this.dataFile = null;
        this.logFile = null;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public long getSize() throws IOException {
        return size();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public long getIntervalForCompact() {
        return this.intervalForCompact;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public void setIntervalForCompact(long j) {
        this.intervalForCompact = j;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public long getIntervalForRemove() {
        return this.intervalForRemove;
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public void setIntervalForRemove(long j) {
        this.intervalForRemove = j;
    }

    @Override // com.taobao.common.store.Store
    public long getMaxFileCount() {
        return this.maxFileCount;
    }

    @Override // com.taobao.common.store.Store
    public void setMaxFileCount(long j) {
        this.maxFileCount = j;
    }

    public void sync() {
        this.dataFileAppender.sync();
    }

    @Override // com.taobao.common.store.journal.JournalStoreMBean
    public void check() throws IOException, InterruptedException {
        Iterator<byte[]> it = iterator();
        long currentTimeMillis = System.currentTimeMillis();
        log.warn("Store4j数据文件整理开始...");
        while (it.hasNext()) {
            BytesKey bytesKey = new BytesKey(it.next());
            long longValue = this.lastModifiedMap.get(bytesKey).longValue();
            if (this.intervalForRemove != -1 && currentTimeMillis - longValue > this.intervalForRemove) {
                innerRemove(bytesKey.getData(), true);
            } else if (currentTimeMillis - longValue > this.intervalForCompact) {
                reuse(bytesKey.getData(), true);
            }
        }
        log.warn("Store4j数据文件整理完毕...");
    }
}
