package com.alibaba.ververica.connectors.datahub.source;

import com.aliyun.datahub.client.model.RecordEntry;
import java.io.IOException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubRowDataSourceAdapter.class */
public class DatahubRowDataSourceAdapter extends RichParallelSourceFunction<RowData> implements CheckpointedFunction, ResultTypeQueryable<RowData> {
    private DatahubSourceFunction source;
    private DatahubParser parser;

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubRowDataSourceAdapter$RecordEntryWithTimeStampCollector.class */
    public static class RecordEntryWithTimeStampCollector implements Collector<RowData> {
        private SourceFunction.SourceContext<RowData> sourceContext;
        private long timestamp = 0;

        public RecordEntryWithTimeStampCollector(SourceFunction.SourceContext<RowData> sourceContext) {
            this.sourceContext = sourceContext;
        }

        public void setTimestamp(long j) {
            this.timestamp = j;
        }

        public void collect(RowData rowData) {
            this.sourceContext.collectWithTimestamp(rowData, this.timestamp);
        }

        public void close() {
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubRowDataSourceAdapter$SourceContextWrapper.class */
    public static class SourceContextWrapper implements SourceFunction.SourceContext<RecordEntry> {
        private SourceFunction.SourceContext<RowData> sourceContext;
        private DatahubParser parser;
        private Collector<RowData> recordEntryCollector;
        private RecordEntryWithTimeStampCollector recordEntryWithTimestampCollector;

        public SourceContextWrapper(final SourceFunction.SourceContext<RowData> sourceContext, DatahubParser datahubParser) {
            this.sourceContext = sourceContext;
            this.parser = datahubParser;
            this.recordEntryCollector = new Collector<RowData>() { // from class: com.alibaba.ververica.connectors.datahub.source.DatahubRowDataSourceAdapter.SourceContextWrapper.1
                public void collect(RowData rowData) {
                    sourceContext.collect(rowData);
                }

                public void close() {
                }
            };
            this.recordEntryWithTimestampCollector = new RecordEntryWithTimeStampCollector(sourceContext);
        }

        public void collect(RecordEntry recordEntry) {
            this.parser.parse(recordEntry, this.recordEntryCollector);
        }

        public void collectWithTimestamp(RecordEntry recordEntry, long j) {
            this.recordEntryWithTimestampCollector.setTimestamp(j);
            this.parser.parse(recordEntry, (Collector<RowData>) this.recordEntryWithTimestampCollector);
        }

        public void emitWatermark(Watermark watermark) {
            this.sourceContext.emitWatermark(watermark);
        }

        public void markAsTemporarilyIdle() {
            this.sourceContext.markAsTemporarilyIdle();
        }

        public Object getCheckpointLock() {
            return this.sourceContext.getCheckpointLock();
        }

        public void close() {
            this.sourceContext.close();
        }
    }

    public DatahubRowDataSourceAdapter(DatahubSourceFunction datahubSourceFunction, DatahubParser datahubParser) {
        this.source = datahubSourceFunction;
        this.parser = datahubParser;
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        super.setRuntimeContext(runtimeContext);
        this.source.setRuntimeContext(runtimeContext);
    }

    public void open(Configuration configuration) throws Exception {
        this.source.open(configuration);
        this.parser.open(new FunctionContext(getRuntimeContext()));
    }

    public TypeInformation<RowData> getProducedType() {
        return this.parser.getProducedType();
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.source.snapshotState(functionSnapshotContext);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.source.initializeState(functionInitializationContext);
    }

    public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
        this.source.run(new SourceContextWrapper(sourceContext, this.parser));
    }

    public void cancel() {
        this.source.cancel();
    }

    public void close() throws IOException {
        try {
            this.source.close();
        } finally {
            this.parser.close();
        }
    }
}
