package com.alibaba.ververica.connectors.datahub.sink;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.errorcode.ConnectorErrors;
import com.alibaba.ververica.connectors.common.exception.ConnectorException;
import com.alibaba.ververica.connectors.common.exception.ErrorUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.common.sink.HasRetryTimeout;
import com.alibaba.ververica.connectors.common.sink.Syncable;
import com.alibaba.ververica.connectors.common.sink.converter.RecordConverter;
import com.alibaba.ververica.connectors.datahub.DatahubClientProvider;
import com.alibaba.ververica.connectors.datahub.DatahubUtils;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.BlobRecordData;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordType;
import com.aliyun.datahub.client.model.ShardEntry;
import com.aliyun.datahub.client.model.ShardState;
import com.aliyun.datahub.clientlibrary.common.Constants;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.producer.Producer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat.class */
public class DatahubOutputFormat<RECORD> extends RichOutputFormat<RECORD> implements Syncable, HasRetryTimeout {
    private static final long serialVersionUID = 1;
    private static final int PRODUCER_RETRY_COUNT = 3;
    private static final String DEFAULT_OUTPUT_FORMAT_NAME = "datahub";
    private static final int DEFAULT_ASYNC_QUEUE_SIZE = 4;
    private static final int DEFAULT_BATCH_COUNT = 500;
    private static final int DEFAULT_BATCH_SIZE = 1048576;
    private static final int DEFAULT_FLUSH_INTERVAL = 10000;
    private static final int DEFAULT_RETRY_TIMEOUT = 1800000;
    private static final int DEFAULT_RETRY_INTERVAL = 1000;
    private static final int DEFAULT_REQUEST_TIMEOUT = 30000;
    private static final int DEFAULT_ASYNC_WRITER_NUM = 4;
    private static final String DUPLICATE_RATIO = "dup_ratio";
    private static final Logger LOG = LoggerFactory.getLogger(DatahubOutputFormat.class);
    protected String endpoint;
    protected String projectName;
    protected String topicName;
    protected String accessKey;
    protected String accessId;
    private Configuration properties;
    protected RecordConverter<RECORD, RecordEntry> recordConverter;
    protected RecordPropertyGetter recordPropertyGetter;
    protected DatahubClientProvider clientProvider;
    protected Meter outTps;
    protected Meter outBps;
    protected SimpleGauge latencyGauge;
    protected DatahubOutputFormat<RECORD>.DoubleGauge dupRatioGauge;
    protected transient ProducerConfig producerConfig;
    protected transient Producer producer;
    protected transient DatahubOutputFormat<RECORD>.TopicWriter topicWriter;
    protected transient Timer flusher;
    protected String name = DEFAULT_OUTPUT_FORMAT_NAME;
    protected boolean doHash = false;
    protected boolean async = false;
    protected boolean mergeBlob = false;
    protected int asyncWriterNum = 4;
    protected int asyncQueueSize = 4;
    protected int batchSize = DEFAULT_BATCH_SIZE;
    protected int batchCount = 500;
    protected int flushInterval = DEFAULT_FLUSH_INTERVAL;
    protected int requestTimeout = 30000;
    protected int retryTimeout = 1800000;
    protected int retryInterval = 1000;
    protected byte[] lineDelimiter = "\n".getBytes(StandardCharsets.UTF_8);
    protected List<String> shardIdList = new ArrayList();
    protected volatile transient Exception flushException = null;

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$AsyncBatchWriter.class */
    class AsyncBatchWriter extends DatahubOutputFormat<RECORD>.BatchWriter {
        private final DatahubOutputFormat<RECORD>.RecordBatchQueue bufferQueue;
        private final Thread writeThread;
        private final Object emptyWaiter;
        private int writerIndex;
        private volatile Exception exception;
        private volatile boolean stop;

        AsyncBatchWriter(int i) {
            super();
            this.emptyWaiter = new Object();
            this.exception = null;
            this.stop = false;
            this.writerIndex = i;
            this.bufferQueue = new RecordBatchQueue(DatahubOutputFormat.this.asyncQueueSize);
            this.writeThread = new Thread(this::run, "writer-" + i);
        }

        @Override // com.alibaba.ververica.connectors.datahub.sink.DatahubOutputFormat.BatchWriter
        void start() {
            this.writeThread.start();
        }

        @Override // com.alibaba.ververica.connectors.datahub.sink.DatahubOutputFormat.BatchWriter
        void stop() {
            if (this.stop) {
                return;
            }
            this.stop = true;
            this.writeThread.interrupt();
        }

        @Override // com.alibaba.ververica.connectors.datahub.sink.DatahubOutputFormat.BatchWriter
        void waitAllFlushed() throws InterruptedException {
            while (this.exception == null && !this.bufferQueue.isEmpty()) {
                synchronized (this.emptyWaiter) {
                    this.emptyWaiter.wait(100L);
                }
            }
        }

        @Override // com.alibaba.ververica.connectors.datahub.sink.DatahubOutputFormat.BatchWriter
        void write(DatahubOutputFormat<RECORD>.RecordBatch recordBatch) {
            boolean z = false;
            while (!z) {
                try {
                    if (this.exception != null) {
                        break;
                    } else {
                        z = this.bufferQueue.offer(recordBatch, DatahubOutputFormat.serialVersionUID, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException e) {
                }
            }
            if (this.exception != null) {
                throw new RuntimeException(this.exception.getMessage());
            }
        }

        private void run() {
            DatahubOutputFormat.LOG.info("writer-{} started", Integer.valueOf(this.writerIndex));
            while (!this.stop) {
                try {
                    try {
                        DatahubOutputFormat<RECORD>.RecordBatch peek = this.bufferQueue.peek();
                        if (peek != null) {
                            doWrite(peek);
                            this.bufferQueue.pop();
                        } else {
                            this.bufferQueue.waitEmpty(DatahubOutputFormat.serialVersionUID, TimeUnit.SECONDS);
                        }
                        if (this.bufferQueue.isEmpty()) {
                            synchronized (this.emptyWaiter) {
                                this.emptyWaiter.notifyAll();
                            }
                        }
                    } catch (IOException e) {
                        this.exception = e;
                        this.stop = true;
                    } catch (InterruptedException e2) {
                    }
                } catch (Exception e3) {
                    DatahubOutputFormat.LOG.error("writer-{} encounter an unexpected fail", Integer.valueOf(this.writerIndex), e3);
                    this.exception = e3;
                }
            }
            this.stop = true;
            DatahubOutputFormat.LOG.info("writer-{} stopped", Integer.valueOf(this.writerIndex));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$BatchWriter.class */
    public class BatchWriter {
        ByteArrayOutputStream buffer;

        BatchWriter() {
            if (DatahubOutputFormat.this.mergeBlob) {
                this.buffer = new ByteArrayOutputStream(DatahubOutputFormat.this.batchSize * 2);
            }
        }

        void start() {
        }

        void stop() {
        }

        void waitAllFlushed() throws InterruptedException {
        }

        void write(DatahubOutputFormat<RECORD>.RecordBatch recordBatch) throws IOException {
            try {
                doWrite(recordBatch);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        protected RecordEntry mergeBlobRecord(List<RecordEntry> list, ByteArrayOutputStream byteArrayOutputStream) throws IOException {
            if (list == null || list.size() == 0) {
                return null;
            }
            if (byteArrayOutputStream == null) {
                throw new IllegalArgumentException("invalid buffer");
            }
            if (list.size() == 1) {
                return list.get(0);
            }
            for (int i = 0; i < list.size(); i++) {
                if (i != 0) {
                    byteArrayOutputStream.write(DatahubOutputFormat.this.lineDelimiter);
                }
                byteArrayOutputStream.write(((BlobRecordData) list.get(i).getRecordData()).getData());
            }
            RecordEntry recordEntry = new RecordEntry();
            recordEntry.setRecordData(new BlobRecordData(byteArrayOutputStream.toByteArray()));
            recordEntry.setAttributes(new HashMap(list.get(list.size() - 1).getAttributes()));
            DatahubOutputFormat.this.mergeCount(list.size());
            return recordEntry;
        }

        protected RecordEntry mergeBlobRecord(List<RecordEntry> list) throws InterruptedException, IOException {
            if (list == null || list.isEmpty()) {
                return null;
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                try {
                    RecordEntry mergeBlobRecord = mergeBlobRecord(list, this.buffer);
                    this.buffer.reset();
                    return mergeBlobRecord;
                } catch (IOException e) {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > DatahubOutputFormat.this.retryTimeout) {
                        DatahubOutputFormat.LOG.error("Retrying cost [{}] ms, exceed timeout threshold [{}].", Long.valueOf(currentTimeMillis2), Integer.valueOf(DatahubOutputFormat.this.retryTimeout));
                        throw e;
                    }
                    DatahubOutputFormat.LOG.warn("Merge blocks failed, wait {} ms and retry", Integer.valueOf(DatahubOutputFormat.this.retryInterval), e);
                    Thread.sleep(DatahubOutputFormat.this.retryInterval);
                }
            }
        }

        protected void doWrite(DatahubOutputFormat<RECORD>.RecordBatch recordBatch) throws IOException, InterruptedException {
            long currentTimeMillis;
            if (recordBatch == null || recordBatch.isEmpty()) {
                return;
            }
            List<RecordEntry> records = recordBatch.getRecords();
            if (records == null || records.isEmpty()) {
                return;
            }
            int size = records.size();
            if (DatahubOutputFormat.this.mergeBlob) {
                records = Collections.singletonList(mergeBlobRecord(records));
            }
            if (recordBatch.isFilterPk()) {
                long count = recordBatch.getCount();
                double d = (1.0d - ((1.0d * size) / count)) * 100.0d;
                DatahubOutputFormat.this.dupRatioGauge.report(d);
                if (DatahubOutputFormat.LOG.isDebugEnabled()) {
                    DatahubOutputFormat.LOG.debug("distinct count: {}, count: {}, duplicate ratio: {}", new Object[]{Integer.valueOf(size), Long.valueOf(count), Double.valueOf(d)});
                }
            }
            if (DatahubOutputFormat.this.recordPropertyGetter != null) {
                RecordPropertyGetter recordPropertyGetter = DatahubOutputFormat.this.recordPropertyGetter;
                recordPropertyGetter.getClass();
                records.forEach(recordPropertyGetter::removeControlAttributes);
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            String shardId = recordBatch.getShardId();
            do {
                try {
                    if (shardId == null) {
                        DatahubOutputFormat.this.producer.send(records, 3);
                    } else {
                        DatahubOutputFormat.this.producer.send(records, shardId, 3);
                    }
                    DatahubOutputFormat.this.sendCount(records.size());
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (DatahubOutputFormat.this.latencyGauge != null) {
                        DatahubOutputFormat.this.latencyGauge.report(currentTimeMillis3 - currentTimeMillis2);
                    }
                    if (DatahubOutputFormat.this.outTps != null) {
                        DatahubOutputFormat.this.outTps.markEvent(size);
                    }
                    if (DatahubOutputFormat.this.outBps != null) {
                        DatahubOutputFormat.this.outBps.markEvent(recordBatch.getBufferSize());
                        return;
                    }
                    return;
                } catch (DatahubClientException e) {
                    if (null != DatahubOutputFormat.this.clientProvider) {
                        DatahubOutputFormat.this.clientProvider.getClient(true, true);
                    }
                    DatahubOutputFormat.LOG.warn("Write record failed, wait {} ms and retry", Integer.valueOf(DatahubOutputFormat.this.retryInterval), e);
                    Thread.sleep(DatahubOutputFormat.this.retryInterval);
                    currentTimeMillis = System.currentTimeMillis() - currentTimeMillis2;
                }
            } while (currentTimeMillis <= DatahubOutputFormat.this.retryTimeout);
            throw new IOException(String.format("Retrying cost [%d] ms, exceed timeout threshold [%d].", Long.valueOf(currentTimeMillis), Integer.valueOf(DatahubOutputFormat.this.retryTimeout)));
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$Builder.class */
    public static class Builder<RECORD> {
        private String endpoint;
        private String projectName;
        private String topicName;
        private String accessId;
        private String accessKey;
        private Configuration properties;
        private String name = DatahubOutputFormat.DEFAULT_OUTPUT_FORMAT_NAME;
        private int batchCount = 500;
        private int batchSize = DatahubOutputFormat.DEFAULT_BATCH_SIZE;
        private int flushInterval = DatahubOutputFormat.DEFAULT_FLUSH_INTERVAL;
        private int requestTimeout = 30000;
        private int retryTimeout = 1800000;
        private int retryInterval = 1000;
        private boolean async = false;
        private int asyncWriterNum = 4;
        private int asyncQueueSize = 4;
        private RecordPropertyGetter recordPropertyGetter = null;
        private RecordConverter<RECORD, RecordEntry> recordConverter = null;

        void checkUserParameter(String str, String str2) {
            if (StringUtils.isNullOrWhitespaceOnly(str)) {
                ErrorUtils.throwException(ConnectorErrors.INST.tableDDLConfigError(this.topicName, str2));
            }
        }

        void checkPositiveParameter(int i, String str) {
            if (i < 0) {
                ErrorUtils.throwException(ConnectorErrors.INST.tableDDLConfigError(this.topicName, str));
            }
        }

        public Builder setName(String str) {
            checkUserParameter(str, "name");
            this.name = str;
            return this;
        }

        public Builder setEndpoint(String str) {
            checkUserParameter(str, "endpoint");
            this.endpoint = str;
            return this;
        }

        public Builder setProjectName(String str) {
            checkUserParameter(str, Constants.PROJECT_NAME_KEY);
            this.projectName = str;
            return this;
        }

        public Builder setTopicName(String str) {
            checkUserParameter(str, Constants.TOPIC_NAME_KEY);
            this.topicName = str;
            return this;
        }

        public Builder setAccessId(String str) {
            this.accessId = str;
            return this;
        }

        public Builder setAccessKey(String str) {
            this.accessKey = str;
            return this;
        }

        public Builder setRequestTimeout(int i) {
            this.requestTimeout = i;
            return this;
        }

        public Builder setProperties(Configuration configuration) {
            if (configuration == null) {
                ErrorUtils.throwException(ConnectorErrors.INST.tableDDLConfigError(this.topicName, "properties"));
            }
            this.properties = configuration;
            return this;
        }

        public Builder setBatchCount(int i) {
            checkPositiveParameter(i, "batchCount");
            this.batchCount = i;
            return this;
        }

        public Builder setBatchSize(int i) {
            checkPositiveParameter(i, "batchSize");
            this.batchSize = i;
            return this;
        }

        public Builder setFlushInterval(int i) {
            if (i < 0) {
                ErrorUtils.throwException(ConnectorErrors.INST.tableDDLConfigError(this.topicName, "flushInterval"));
            }
            this.flushInterval = i;
            return this;
        }

        public Builder setRetryTimeout(int i) {
            checkPositiveParameter(i, "retryTimeout");
            this.retryTimeout = i;
            return this;
        }

        public Builder setRetryInterval(int i) {
            checkPositiveParameter(i, "retryInterval");
            this.retryInterval = i;
            return this;
        }

        public Builder setAsync(boolean z) {
            this.async = z;
            return this;
        }

        public Builder setAsyncWriterNum(int i) {
            checkPositiveParameter(this.retryInterval, "asyncWriterNum");
            this.asyncWriterNum = i;
            return this;
        }

        public Builder setAsyncQueueSize(int i) {
            checkPositiveParameter(this.retryInterval, "asyncQueueSize");
            this.asyncQueueSize = i;
            return this;
        }

        public Builder setRecordPropertyGetter(RecordPropertyGetter recordPropertyGetter) {
            this.recordPropertyGetter = recordPropertyGetter;
            return this;
        }

        public Builder setRecordConverter(RecordConverter<RECORD, RecordEntry> recordConverter) {
            this.recordConverter = recordConverter;
            return this;
        }

        public DatahubOutputFormat<RECORD> build() {
            if (this.recordConverter == null) {
                throw new ConnectorException("Record converter must not be set");
            }
            DatahubOutputFormat<RECORD> datahubOutputFormat = (StringUtils.isNullOrWhitespaceOnly(this.accessId) || StringUtils.isNullOrWhitespaceOnly(this.accessKey)) ? new DatahubOutputFormat<>(this.endpoint, this.projectName, this.topicName, this.properties, this.recordConverter) : new DatahubOutputFormat<>(this.endpoint, this.projectName, this.topicName, this.accessId, this.accessKey, this.recordConverter);
            datahubOutputFormat.setName(this.name).setRequestTimeout(this.requestTimeout).setBatchCount(this.batchCount).setBatchSize(this.batchSize).setFlushInterval(this.flushInterval).setRetryInterval(this.retryInterval).setRetryTimeout(this.retryTimeout).setAsync(this.async).setAsyncWriterNum(this.asyncWriterNum).setAsyncQueueSize(this.asyncQueueSize).setRecordPropertyGetter(this.recordPropertyGetter);
            return datahubOutputFormat;
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$DoubleGauge.class */
    public class DoubleGauge implements Gauge<Double> {
        private double value;

        public DoubleGauge() {
        }

        void report(long j) {
            this.value = j * 1.0d;
        }

        void report(double d) {
            this.value = d;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Double m51getValue() {
            return Double.valueOf(this.value);
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$RecordBatch.class */
    public class RecordBatch {
        private boolean roundRobin;
        private String shardId;
        private boolean filterPk;
        private long count;
        private int bufferSize;
        private List<RecordEntry> recordEntryList;
        private Map<String, RecordEntry> recordEntryMap;
        private Map<String, Integer> recordBytesMap;
        private long createTimestamp;

        public RecordBatch() {
            this.roundRobin = true;
            this.shardId = null;
            this.count = 0L;
            this.bufferSize = 0;
            this.createTimestamp = System.currentTimeMillis();
            this.filterPk = DatahubOutputFormat.this.recordPropertyGetter != null && DatahubOutputFormat.this.recordPropertyGetter.hasPrimaryKey();
            if (!this.filterPk) {
                this.recordEntryList = new ArrayList();
            } else {
                this.recordEntryMap = new HashMap();
                this.recordBytesMap = new HashMap();
            }
        }

        public RecordBatch(DatahubOutputFormat datahubOutputFormat, String str) {
            this();
            this.shardId = str;
            this.roundRobin = str != null;
        }

        public void reset() {
            if (this.recordEntryList != null) {
                this.recordEntryList.clear();
            }
            if (this.recordEntryMap != null) {
                this.recordEntryMap.clear();
            }
            if (this.recordBytesMap != null) {
                this.recordBytesMap.clear();
            }
            this.count = 0L;
            this.createTimestamp = System.currentTimeMillis();
        }

        public void add(RecordEntry recordEntry) {
            int calcRecordBytes = (DatahubOutputFormat.this.recordPropertyGetter == null || !DatahubOutputFormat.this.recordPropertyGetter.hasRecordBytes()) ? DatahubUtils.calcRecordBytes(recordEntry) : DatahubOutputFormat.this.recordPropertyGetter.getRecordBytes(recordEntry);
            if (this.filterPk) {
                String primaryKey = DatahubOutputFormat.this.recordPropertyGetter.getPrimaryKey(recordEntry);
                Integer num = this.recordBytesMap.get(primaryKey);
                this.bufferSize -= num == null ? 0 : num.intValue();
                this.recordEntryMap.put(primaryKey, recordEntry);
                this.recordBytesMap.put(primaryKey, Integer.valueOf(calcRecordBytes));
            } else {
                this.recordEntryList.add(recordEntry);
            }
            this.bufferSize += calcRecordBytes;
            this.count += DatahubOutputFormat.serialVersionUID;
        }

        public boolean isReady() {
            return this.bufferSize + 6 >= DatahubOutputFormat.this.batchSize || getRecordNum() >= DatahubOutputFormat.this.batchCount || System.currentTimeMillis() - this.createTimestamp >= ((long) DatahubOutputFormat.this.flushInterval);
        }

        public boolean isRoundRobin() {
            return this.roundRobin;
        }

        public String getShardId() {
            return this.shardId;
        }

        public int getRecordNum() {
            return this.filterPk ? this.recordEntryMap.size() : this.recordEntryList.size();
        }

        public long getCount() {
            return this.count;
        }

        public boolean isFilterPk() {
            return this.filterPk;
        }

        public List<RecordEntry> getRecords() {
            return this.filterPk ? new ArrayList(this.recordEntryMap.values()) : this.recordEntryList;
        }

        public int getBufferSize() {
            if (this.bufferSize == 0) {
                return 0;
            }
            return this.bufferSize + 6;
        }

        public boolean isEmpty() {
            return this.count == 0;
        }

        public long getCreateTimestamp() {
            return this.createTimestamp;
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$RecordBatchQueue.class */
    public class RecordBatchQueue {
        private volatile DatahubOutputFormat<RECORD>.RecordBatch dummyFront = null;
        private final LinkedBlockingQueue<DatahubOutputFormat<RECORD>.RecordBatch> bufferQueue;

        public RecordBatchQueue(int i) {
            this.bufferQueue = new LinkedBlockingQueue<>(i);
        }

        public int size() {
            return this.bufferQueue.size() + (this.dummyFront == null ? 0 : 1);
        }

        public boolean isEmpty() {
            return this.dummyFront == null && this.bufferQueue.isEmpty();
        }

        public DatahubOutputFormat<RECORD>.RecordBatch peek() {
            return this.dummyFront == null ? this.bufferQueue.peek() : this.dummyFront;
        }

        public boolean offer(DatahubOutputFormat<RECORD>.RecordBatch recordBatch, long j, TimeUnit timeUnit) throws InterruptedException {
            return this.bufferQueue.offer(recordBatch, j, timeUnit);
        }

        public void pop() {
            if (this.dummyFront == null) {
                this.bufferQueue.poll();
            } else {
                this.dummyFront = null;
            }
        }

        public void waitEmpty(long j, TimeUnit timeUnit) throws InterruptedException {
            if (this.dummyFront == null) {
                this.dummyFront = this.bufferQueue.poll(j, timeUnit);
            }
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/sink/DatahubOutputFormat$TopicWriter.class */
    public class TopicWriter {
        private List<DatahubOutputFormat<RECORD>.RecordBatch> batches = new ArrayList();
        private List<DatahubOutputFormat<RECORD>.BatchWriter> writers = new ArrayList();
        private final Object batchLock = new Object();
        private int roundRobinIndex = Math.abs(new Random().nextInt());

        TopicWriter(List<String> list) {
            Preconditions.checkArgument(!list.isEmpty(), "Shard list must not be empty");
            if (DatahubOutputFormat.this.doHash) {
                for (int i = 0; i < list.size(); i++) {
                    this.batches.add(new RecordBatch(DatahubOutputFormat.this, list.get(i)));
                }
            } else {
                this.batches.add(new RecordBatch());
            }
            if (!DatahubOutputFormat.this.async) {
                this.writers.add(new BatchWriter());
                return;
            }
            for (int i2 = 0; i2 < DatahubOutputFormat.this.asyncWriterNum; i2++) {
                this.writers.add(new AsyncBatchWriter(i2));
            }
        }

        void open() {
            this.writers.forEach((v0) -> {
                v0.start();
            });
        }

        void close() throws IOException, InterruptedException {
            sync();
            this.writers.forEach((v0) -> {
                v0.stop();
            });
        }

        void flush(boolean z) throws IOException {
            synchronized (this.batchLock) {
                for (int i = 0; i < this.batches.size(); i++) {
                    if (this.batches.get(i).isReady() || z) {
                        submit(i);
                    }
                }
            }
        }

        void sync() throws IOException, InterruptedException {
            flush(true);
            if (DatahubOutputFormat.this.async) {
                Iterator<DatahubOutputFormat<RECORD>.BatchWriter> it = this.writers.iterator();
                while (it.hasNext()) {
                    it.next().waitAllFlushed();
                }
            }
        }

        public void write(RecordEntry recordEntry) throws IOException {
            int calcBatchIndex = calcBatchIndex(recordEntry);
            synchronized (this.batchLock) {
                DatahubOutputFormat<RECORD>.RecordBatch recordBatch = this.batches.get(calcBatchIndex);
                recordBatch.add(recordEntry);
                if (recordBatch.isReady()) {
                    submit(calcBatchIndex);
                }
            }
        }

        public List<DatahubOutputFormat<RECORD>.RecordBatch> getBatches() {
            return this.batches;
        }

        private void submit(int i) throws IOException {
            int size;
            DatahubOutputFormat<RECORD>.RecordBatch recordBatch = this.batches.get(i);
            if (recordBatch.isRoundRobin()) {
                int i2 = this.roundRobinIndex + 1;
                this.roundRobinIndex = i2;
                size = i2 % this.writers.size();
            } else {
                size = i % this.writers.size();
            }
            this.writers.get(size).write(recordBatch);
            this.batches.set(i, new RecordBatch(DatahubOutputFormat.this, recordBatch.getShardId()));
        }

        private int calcBatchIndex(RecordEntry recordEntry) {
            if (!DatahubOutputFormat.this.doHash || this.batches.size() <= 1) {
                return 0;
            }
            return Math.abs(hashRecord(recordEntry) % this.batches.size());
        }

        private int hashRecord(RecordEntry recordEntry) {
            String hashField = DatahubOutputFormat.this.recordPropertyGetter.getHashField(recordEntry);
            if (hashField != null) {
                return hashField.hashCode() & Integer.MAX_VALUE;
            }
            throw new RuntimeException("cannot get hash field from record!");
        }
    }

    public DatahubOutputFormat(String str, String str2, String str3, String str4, String str5, RecordConverter<RECORD, RecordEntry> recordConverter) {
        this.endpoint = str;
        this.projectName = str2;
        this.topicName = str3;
        this.accessKey = str5;
        this.accessId = str4;
        this.recordConverter = recordConverter;
    }

    public DatahubOutputFormat(String str, String str2, String str3, Configuration configuration, RecordConverter<RECORD, RecordEntry> recordConverter) {
        this.endpoint = str;
        this.projectName = str2;
        this.topicName = str3;
        this.properties = configuration;
        this.recordConverter = recordConverter;
    }

    protected void scheduleFlusher() {
        this.flusher = new Timer(getName() + ".buffer.flusher");
        this.flusher.schedule(new TimerTask() { // from class: com.alibaba.ververica.connectors.datahub.sink.DatahubOutputFormat.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    DatahubOutputFormat.this.topicWriter.flush(false);
                } catch (Exception e) {
                    DatahubOutputFormat.LOG.error("flush buffer failed", e);
                    DatahubOutputFormat.this.flushException = e;
                }
            }
        }, this.flushInterval, this.flushInterval);
    }

    public DatahubOutputFormat setName(String str) {
        this.name = str;
        return this;
    }

    public DatahubOutputFormat setRequestTimeout(int i) {
        this.requestTimeout = i;
        return this;
    }

    public DatahubOutputFormat setLineDelimiter(byte[] bArr) {
        this.lineDelimiter = bArr;
        return this;
    }

    public DatahubOutputFormat setBatchSize(int i) {
        this.batchSize = i;
        return this;
    }

    public DatahubOutputFormat setBatchCount(int i) {
        this.batchCount = i;
        return this;
    }

    public DatahubOutputFormat setFlushInterval(int i) {
        this.flushInterval = i;
        return this;
    }

    public DatahubOutputFormat setRetryTimeout(int i) {
        this.retryTimeout = i;
        return this;
    }

    public DatahubOutputFormat setRetryInterval(int i) {
        this.retryInterval = i;
        return this;
    }

    public DatahubOutputFormat setRecordPropertyGetter(RecordPropertyGetter recordPropertyGetter) {
        this.recordPropertyGetter = recordPropertyGetter;
        return this;
    }

    public DatahubOutputFormat setAsync(boolean z) {
        this.async = z;
        return this;
    }

    public DatahubOutputFormat setAsyncWriterNum(int i) {
        this.asyncWriterNum = i;
        return this;
    }

    public DatahubOutputFormat setAsyncQueueSize(int i) {
        this.asyncQueueSize = i;
        return this;
    }

    protected List<String> initShardList() {
        ArrayList arrayList = new ArrayList();
        for (ShardEntry shardEntry : getClient().listShard(this.projectName, this.topicName).getShards()) {
            if (ShardState.ACTIVE == shardEntry.getState()) {
                arrayList.add(shardEntry.getShardId());
            }
        }
        arrayList.sort(Comparator.comparingInt(Integer::parseInt));
        return arrayList;
    }

    protected ProducerConfig createProducerConfig() {
        ProducerConfig clientProvider = new ProducerConfig(this.endpoint, this.accessId, this.accessKey).setUserAgent(DatahubUtils.VERVERICA_VERSION).setClientProvider(this.clientProvider);
        clientProvider.getHttpConfig().setCompressType(null).setReadTimeout(this.requestTimeout).setConnTimeout(this.requestTimeout);
        return clientProvider;
    }

    protected void initClientProvider() {
        if (null == this.clientProvider) {
            this.clientProvider = new DatahubClientProvider(this.endpoint, this.accessId, this.accessKey, this.properties, new HttpConfig().setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout));
        }
    }

    protected DatahubClient getClient() {
        return (DatahubClient) this.clientProvider.getClient();
    }

    public void open(int i, int i2) throws IOException {
        this.recordConverter.open(getRuntimeContext(), this.properties);
        initClientProvider();
        if (this.producerConfig != null) {
            throw new IOException("producerConfig is not null, not expected!");
        }
        this.producerConfig = createProducerConfig();
        this.shardIdList = initShardList();
        if (this.producer != null) {
            throw new IOException("producer is not null, not expected!");
        }
        this.producer = new Producer(this.projectName, this.topicName, this.shardIdList, this.producerConfig);
        this.mergeBlob = this.mergeBlob && RecordType.BLOB.equals(getClient().getTopic(this.projectName, this.topicName).getRecordType());
        this.doHash = this.recordPropertyGetter != null && this.recordPropertyGetter.hasHashField();
        this.asyncWriterNum = Math.max(1, Math.min(this.asyncWriterNum, this.shardIdList.size()));
        if (this.topicWriter != null) {
            throw new IOException("topic writer is not null, not expected!");
        }
        this.topicWriter = new TopicWriter(this.shardIdList);
        this.topicWriter.open();
        this.outTps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
        this.outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), getName());
        this.latencyGauge = MetricUtils.registerCurrentSendTime(getRuntimeContext());
        this.dupRatioGauge = (DoubleGauge) getRuntimeContext().getMetricGroup().addGroup(getName()).gauge(DUPLICATE_RATIO, new DoubleGauge());
        if (this.flushInterval > 0) {
            scheduleFlusher();
        }
        LOG.info("open output format with\nproject: {}\ntopic: {}\ndoHash: {}\nasync: {}\nasyncWriterNum: {}\nasyncQueueSize: {}\nbatchSize: {} B\nbatchCount: {}\nflushInterval: {} ms\nrequestTimeout: {} ms\nretryTimeout: {} ms\nretryInterval: {} ms\n", new Object[]{this.projectName, this.topicName, Boolean.valueOf(this.doHash), Boolean.valueOf(this.async), Integer.valueOf(this.asyncWriterNum), Integer.valueOf(this.asyncQueueSize), Integer.valueOf(this.batchSize), Integer.valueOf(this.batchCount), Integer.valueOf(this.flushInterval), Integer.valueOf(this.requestTimeout), Integer.valueOf(this.retryTimeout), Integer.valueOf(this.retryInterval)});
    }

    @Override // com.alibaba.ververica.connectors.common.sink.HasRetryTimeout
    public long getRetryTimeout() {
        return this.retryTimeout;
    }

    @Override // com.alibaba.ververica.connectors.common.sink.Syncable
    public void sync() throws IOException {
        try {
            this.topicWriter.sync();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void writeRecord(RECORD record) throws IOException {
        if (null != this.flushException) {
            throw new IOException(this.flushException);
        }
        RecordEntry convert = this.recordConverter.convert(record);
        if (convert == null) {
            return;
        }
        this.topicWriter.write(convert);
    }

    public String getName() {
        return this.name;
    }

    public void configure(Configuration configuration) {
        this.properties = configuration;
    }

    public void close() throws IOException {
        try {
            if (this.flusher != null) {
                this.flusher.cancel();
                this.flusher = null;
            }
        } catch (Throwable th) {
            LOG.warn("flusher cancel error in close", th);
        }
        try {
            sync();
        } catch (Throwable th2) {
            LOG.warn("sync error in close", th2);
        }
        if (this.topicWriter != null) {
            try {
                this.topicWriter.close();
                this.topicWriter = null;
            } catch (Throwable th3) {
                LOG.warn("topicWriter close error in close", th3);
            }
        }
        if (this.producer != null) {
            try {
                this.producer.close();
                this.producer = null;
            } catch (Throwable th4) {
                LOG.warn("producer close error in close", th4);
            }
        }
        this.recordConverter.close();
        LOG.info("Close datahub output format successfully");
    }

    protected void mergeCount(int i) {
    }

    protected void sendCount(int i) {
    }
}
