/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.shaded.connector.kafka.source.reader;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.table.store.shaded.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.table.store.shaded.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.table.store.shaded.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.table.store.shaded.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.table.store.shaded.connector.kafka.source.split.KafkaPartitionSplitState;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KafkaSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap());
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();
    private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
    private final boolean commitOffsetsOnCheckpoint;

    public KafkaSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, KafkaSourceFetcherManager kafkaSourceFetcherManager, RecordEmitter<ConsumerRecord<byte[], byte[]>, T, KafkaPartitionSplitState> recordEmitter, Configuration config, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
        super(elementsQueue, (SingleThreadFetcherManager)kafkaSourceFetcherManager, recordEmitter, config, context);
        this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
        this.commitOffsetsOnCheckpoint = (Boolean)config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
        if (!this.commitOffsetsOnCheckpoint) {
            LOG.warn("Offset commit on checkpoint is disabled. Consuming offset will not be reported back to Kafka cluster.");
        }
    }

    protected void onSplitFinished(Map<String, KafkaPartitionSplitState> finishedSplitIds) {
        finishedSplitIds.forEach((ignored, splitState) -> {
            if (splitState.getCurrentOffset() >= 0L) {
                this.offsetsOfFinishedSplits.put(splitState.getTopicPartition(), new OffsetAndMetadata(splitState.getCurrentOffset()));
            }
        });
    }

    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
        List splits = super.snapshotState(checkpointId);
        if (!this.commitOffsetsOnCheckpoint) {
            return splits;
        }
        if (splits.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            this.offsetsToCommit.put(checkpointId, Collections.emptyMap());
        } else {
            Map offsetsMap = this.offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap());
            for (KafkaPartitionSplit split : splits) {
                if (split.getStartingOffset() < 0L) continue;
                offsetsMap.put(split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset()));
            }
            offsetsMap.putAll(this.offsetsOfFinishedSplits);
        }
        return splits;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing offsets for checkpoint {}", (Object)checkpointId);
        if (!this.commitOffsetsOnCheckpoint) {
            return;
        }
        Map committedPartitions = (Map)this.offsetsToCommit.get(checkpointId);
        if (committedPartitions == null) {
            LOG.debug("Offsets for checkpoint {} either do not exist or have already been committed.", (Object)checkpointId);
            return;
        }
        ((KafkaSourceFetcherManager)this.splitFetcherManager).commitOffsets(committedPartitions, (ignored, e) -> {
            if (e != null) {
                this.kafkaSourceReaderMetrics.recordFailedCommit();
                LOG.warn("Failed to commit consumer offsets for checkpoint {}", (Object)checkpointId, (Object)e);
            } else {
                LOG.debug("Successfully committed offsets for checkpoint {}", (Object)checkpointId);
                this.kafkaSourceReaderMetrics.recordSucceededCommit();
                committedPartitions.forEach((tp, offset) -> this.kafkaSourceReaderMetrics.recordCommittedOffset((TopicPartition)tp, offset.offset()));
                this.offsetsOfFinishedSplits.entrySet().removeIf(entry -> committedPartitions.containsKey(entry.getKey()));
                while (!this.offsetsToCommit.isEmpty() && this.offsetsToCommit.firstKey() <= checkpointId) {
                    this.offsetsToCommit.remove(this.offsetsToCommit.firstKey());
                }
            }
        });
    }

    protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit split) {
        return new KafkaPartitionSplitState(split);
    }

    protected KafkaPartitionSplit toSplitType(String splitId, KafkaPartitionSplitState splitState) {
        return splitState.toKafkaPartitionSplit();
    }

    @VisibleForTesting
    SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> getOffsetsToCommit() {
        return this.offsetsToCommit;
    }

    @VisibleForTesting
    int getNumAliveFetchers() {
        return this.splitFetcherManager.getNumAliveFetchers();
    }
}

