/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel.io;

import com.aliyun.odps.commons.util.RetryExceedLimitException;
import com.aliyun.odps.commons.util.RetryStrategy;
import com.aliyun.odps.commons.util.backoff.BackOffStrategy;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TunnelBufferedWriter
implements RecordWriter {
    private ProtobufRecordPack bufferedPack;
    private TableTunnel.UploadSession session;
    private RetryStrategy retry;
    private long bufferSize;
    private long bytesWritten;
    private static final long BUFFER_SIZE_DEFAULT = 0x4000000L;
    private static final long BUFFER_SIZE_MIN = 0x100000L;
    private static final long BUFFER_SIZE_MAX = 1048576000L;
    private static final Logger LOG = LoggerFactory.getLogger(TunnelBufferedWriter.class);

    public TunnelBufferedWriter(TableTunnel.UploadSession session, CompressOption option) throws IOException {
        this.bufferedPack = new ProtobufRecordPack(session.getSchema(), new Checksum(), option);
        this.session = session;
        this.bufferSize = 0x4000000L;
        this.retry = new TunnelRetryStrategy();
        this.bytesWritten = 0L;
    }

    public void setBufferSize(long bufferSize) {
        if (bufferSize < 0x100000L) {
            throw new IllegalArgumentException("buffer size must >= 1048576, now: " + bufferSize);
        }
        if (bufferSize > 1048576000L) {
            throw new IllegalArgumentException("buffer size must <= 1048576000, now: " + bufferSize);
        }
        this.bufferSize = bufferSize;
    }

    public void setRetryStrategy(RetryStrategy strategy) {
        this.retry = strategy;
    }

    @Override
    public void write(Record r) throws IOException {
        if (this.bufferedPack.getTotalBytes() > this.bufferSize) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("BufferedWriter({}) need to flush. Pack total bytes: {} is greater than buffer size: {}.", new Object[]{System.identityHashCode(this), this.bufferedPack.getTotalBytes(), this.bufferSize});
            }
            this.flush();
        }
        this.bufferedPack.append(r);
    }

    @Override
    public void close() throws IOException {
        this.flush();
    }

    public long getTotalBytes() throws IOException {
        this.flush();
        return this.bytesWritten;
    }

    public void flush() throws IOException {
        this.retry.reset();
        long delta = this.bufferedPack.getTotalBytesWritten();
        if (delta > 0L) {
            Long blockId = this.session.getAvailBlockId();
            while (true) {
                try {
                    this.session.writeBlock(blockId, this.bufferedPack);
                    if (!LOG.isDebugEnabled()) break;
                    LOG.debug("BufferedWriter({}) flush record pack({} bytes) to block({}) success. Total bytes written: {}.", new Object[]{System.identityHashCode(this), delta, blockId, this.bufferedPack.getTotalBytes()});
                    break;
                }
                catch (IOException e) {
                    if (LOG.isErrorEnabled()) {
                        LOG.error("BufferedWriter({}) flush record pack({} bytes) to block({}) error: {}", new Object[]{System.identityHashCode(this), this.bufferedPack.getTotalBytes(), blockId, e.getMessage()});
                    }
                    try {
                        this.retry.onFailure(e);
                    }
                    catch (RetryExceedLimitException ignore) {
                        throw e;
                    }
                }
            }
            this.bufferedPack.reset();
            this.bytesWritten += delta;
        }
    }

    static class TunnelRetryStrategy
    extends RetryStrategy {
        private static final int limit = 6;
        private static final int interval = 4;

        TunnelRetryStrategy() {
            super(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF);
        }

        TunnelRetryStrategy(int limit, BackOffStrategy strategy) {
            super(limit, strategy);
        }

        @Override
        protected boolean needRetry(Exception e) {
            TunnelException err = null;
            if (e.getCause() instanceof TunnelException) {
                err = (TunnelException)e.getCause();
            }
            if (e instanceof TunnelException) {
                err = (TunnelException)e;
            }
            return err == null || err.getStatus() == null || err.getStatus() / 100 != 4;
        }
    }
}

