package com.alibaba.ververica.connectors.datahub.source;

import com.alibaba.ververica.connectors.common.source.AbstractDynamicParallelSource;
import com.alibaba.ververica.connectors.common.source.SourceUtils;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.datahub.DatahubClientProvider;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.ShardEntry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubSourceFunction.class */
public class DatahubSourceFunction extends AbstractDynamicParallelSource<RecordEntry, Long> {
    private static final long serialVersionUID = 1;
    private static Logger logger = LoggerFactory.getLogger(DatahubSourceFunction.class);
    private String endpoint;
    private String projectName;
    private String topicName;
    private String subId;
    private String accessId;
    private String accessKey;
    private Configuration properties;
    private long startTimeInMs;
    private long stopTimeInMs;
    private int requestTimeout;
    private int maxFetchSize;
    private int maxBufferSize;
    private int retryInterval;
    private int retryTimeout;
    private int fetchLatestDelay;
    private transient DatahubClientProvider clientProvider;
    private List<String> initShardsList;

    public DatahubSourceFunction(String str, String str2, String str3, String str4, String str5, String str6, long j, long j2) {
        this(str, str2, str3, str4, str5, str6, new Configuration(), j, j2);
    }

    public DatahubSourceFunction(String str, String str2, String str3, String str4, String str5, String str6, Configuration configuration, long j, long j2) {
        this.accessId = null;
        this.accessKey = null;
        this.requestTimeout = DatahubRecordReader.DEFAULT_REQUEST_TIMEOUT;
        this.maxFetchSize = 50;
        this.maxBufferSize = 50;
        this.retryInterval = 1000;
        this.retryTimeout = DatahubRecordReader.DEFAULT_RETRY_TIMEOUT;
        this.fetchLatestDelay = 500;
        this.endpoint = str;
        this.projectName = str2;
        this.topicName = str3;
        this.subId = str4;
        this.accessId = str5;
        this.accessKey = str6;
        this.startTimeInMs = j;
        this.stopTimeInMs = j2;
        this.properties = configuration;
        this.clientProvider = createProvider();
    }

    public void setRequestTimeout(int i) {
        this.requestTimeout = i;
    }

    public void setMaxFetchSize(int i) {
        this.maxFetchSize = i;
    }

    public void setMaxBufferSize(int i) {
        this.maxBufferSize = i;
    }

    public void setRetryInterval(int i) {
        this.retryInterval = i;
    }

    public void setRetryTimeout(int i) {
        this.retryTimeout = i;
    }

    public void setFetchLatestDelay(int i) {
        this.fetchLatestDelay = i;
    }

    @Override // com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase
    public RecordReader<RecordEntry, Long> createReader(Configuration configuration) throws IOException {
        initShardsList();
        DatahubRecordReader datahubRecordReader = (StringUtils.isNullOrWhitespaceOnly(this.accessId) || StringUtils.isNullOrWhitespaceOnly(this.accessKey)) ? new DatahubRecordReader(this.endpoint, this.projectName, this.topicName, this.subId, this.properties, this.initShardsList.size(), this.startTimeInMs, this.stopTimeInMs) : new DatahubRecordReader(this.endpoint, this.projectName, this.topicName, this.subId, this.accessId, this.accessKey, this.initShardsList.size(), this.startTimeInMs, this.stopTimeInMs);
        datahubRecordReader.setRequestTimeout(this.requestTimeout).setMaxFetchSize(this.maxFetchSize).setMaxBufferSize(this.maxBufferSize).setRetryInterval(this.retryInterval).setRetryTimeout(this.retryTimeout).setFetchLatestDelay(this.fetchLatestDelay).setClientProvider(this.clientProvider);
        return datahubRecordReader;
    }

    @Override // com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase
    /* renamed from: createInputSplitsForCurrentSubTask */
    public InputSplit[] mo13createInputSplitsForCurrentSubTask(int i, int i2) throws IOException {
        initShardsList();
        List<Integer> modAssign = SourceUtils.modAssign("datahub-" + this.projectName + ":" + this.topicName, i, i2, this.initShardsList.size());
        DatahubShardInputSplit[] datahubShardInputSplitArr = new DatahubShardInputSplit[modAssign.size()];
        int i3 = 0;
        for (Integer num : modAssign) {
            int i4 = i3;
            i3++;
            datahubShardInputSplitArr[i4] = new DatahubShardInputSplit(num.intValue(), this.initShardsList.get(num.intValue()));
        }
        return datahubShardInputSplitArr;
    }

    @Override // com.alibaba.ververica.connectors.common.source.AbstractDynamicParallelSource
    public List<Tuple2<InputSplit, Long>> reAssignInputSplitsForCurrentSubTask(int i, int i2, List<AbstractDynamicParallelSource.InnerProgress<Long>> list) throws IOException {
        initShardsList();
        ArrayList arrayList = new ArrayList();
        for (String str : modAssign(i, i2)) {
            boolean z = false;
            Iterator<AbstractDynamicParallelSource.InnerProgress<Long>> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                AbstractDynamicParallelSource.InnerProgress<Long> next = it.next();
                if (Integer.parseInt(str) == Integer.parseInt(((DatahubShardInputSplit) next.getInputSplit()).getShardId())) {
                    logger.info("Current subTask [{}], split [{}], progress: [{}]", new Object[]{Integer.valueOf(i2), str, next.getCursor()});
                    arrayList.add(new Tuple2(next.getInputSplit(), next.getCursor()));
                    z = true;
                    break;
                }
            }
            if (!z) {
                DatahubShardInputSplit datahubShardInputSplit = new DatahubShardInputSplit(Integer.parseInt(str), str);
                logger.info("Current subTask [{}], new split [{}]", Integer.valueOf(i2), datahubShardInputSplit);
                arrayList.add(Tuple2.of(datahubShardInputSplit, -1L));
            }
        }
        return arrayList;
    }

    @Override // com.alibaba.ververica.connectors.common.source.AbstractParallelSourceBase
    public List<String> getPartitionList() throws Exception {
        ArrayList arrayList = new ArrayList();
        if (null == this.clientProvider) {
            this.clientProvider = createProvider();
        }
        Iterator<ShardEntry> it = ((DatahubClient) this.clientProvider.getClient()).listShard(this.projectName, this.topicName).getShards().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getShardId());
        }
        arrayList.sort(Comparator.comparingInt(Integer::parseInt));
        return arrayList;
    }

    private void initShardsList() {
        if (null == this.initShardsList || this.initShardsList.size() == 0) {
            try {
                this.initShardsList = getPartitionList();
            } catch (Exception e) {
                logger.error("init shard list failed", e);
                throw new RuntimeException(e);
            }
        }
    }

    public String toString() {
        return String.format("%s:%s:%s", getClass().getSimpleName(), this.projectName, this.topicName);
    }

    private DatahubClientProvider createProvider() {
        return new DatahubClientProvider(this.endpoint, this.accessId, this.accessKey, this.properties, new HttpConfig().setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout));
    }

    private List<String> modAssign(int i, int i2) {
        LinkedList linkedList = new LinkedList();
        for (String str : this.initShardsList) {
            if (Integer.parseInt(str) % i == i2) {
                linkedList.add(str);
            }
        }
        return linkedList;
    }
}
