/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.google.common.base.Preconditions;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.PulsarIOSourceTaskContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
import org.apache.pulsar.io.kafka.connect.PulsarOffsetBackingStore;
import org.apache.pulsar.io.kafka.connect.schema.KafkaSchemaWrappedSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
    private SourceTaskContext sourceTaskContext;
    private SourceConnector connector;
    private SourceTask sourceTask;
    public Converter keyConverter;
    public Converter valueConverter;
    private Iterator<SourceRecord> currentBatch = null;
    private OffsetBackingStore offsetStore;
    private OffsetStorageReader offsetReader;
    private String topicNamespace;
    public OffsetStorageWriter offsetWriter;
    private final AtomicInteger outstandingRecords = new AtomicInteger(0);
    private final AtomicBoolean flushing = new AtomicBoolean(false);
    private final AtomicReference<CompletableFuture<Void>> flushFutureRef = new AtomicReference();
    public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";
    private static final Map<String, String> PROPERTIES = Collections.emptyMap();
    private static final Optional<Long> RECORD_SEQUENCE = Optional.empty();

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        Map<String, String> taskConfig;
        HashMap<String, String> stringConfig = new HashMap<String, String>();
        config.forEach((key, value) -> {
            if (value instanceof String) {
                stringConfig.put((String)key, (String)value);
            }
        });
        this.topicNamespace = (String)stringConfig.get("topic.namespace");
        this.keyConverter = Class.forName((String)stringConfig.get("key.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        this.valueConverter = Class.forName((String)stringConfig.get("value.converter")).asSubclass(Converter.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        if (this.keyConverter instanceof AvroConverter) {
            this.keyConverter = new AvroConverter((SchemaRegistryClient)new MockSchemaRegistryClient());
            config.put("schema.registry.url", "mock");
        }
        if (this.valueConverter instanceof AvroConverter) {
            this.valueConverter = new AvroConverter((SchemaRegistryClient)new MockSchemaRegistryClient());
            config.put("schema.registry.url", "mock");
        }
        this.keyConverter.configure(config, true);
        this.valueConverter.configure(config, false);
        this.offsetStore = new PulsarOffsetBackingStore(sourceContext.getPulsarClient());
        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
        this.offsetStore.configure((WorkerConfig)pulsarKafkaWorkerConfig);
        this.offsetStore.start();
        this.offsetReader = new OffsetStorageReaderImpl(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.offsetWriter = new OffsetStorageWriter(this.offsetStore, "pulsar-kafka-connect-adaptor", this.keyConverter, this.valueConverter);
        this.sourceTaskContext = new PulsarIOSourceTaskContext(this.offsetReader, pulsarKafkaWorkerConfig);
        if (config.get(CONNECTOR_CLASS) != null) {
            String kafkaConnectorFQClassName = config.get(CONNECTOR_CLASS).toString();
            Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
            this.connector = (SourceConnector)clazz.getConstructor(new Class[0]).newInstance(new Object[0]);
            Class taskClass = this.connector.taskClass();
            this.sourceTask = (SourceTask)taskClass.getConstructor(new Class[0]).newInstance(new Object[0]);
            this.connector.initialize((ConnectorContext)new PulsarKafkaSinkContext());
            this.connector.start(stringConfig);
            List configs = this.connector.taskConfigs(1);
            Preconditions.checkNotNull((Object)configs);
            Preconditions.checkArgument((configs.size() == 1 ? 1 : 0) != 0);
            taskConfig = (Map)configs.get(0);
        } else {
            this.sourceTask = Class.forName((String)stringConfig.get("task.class")).asSubclass(SourceTask.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            taskConfig = stringConfig;
        }
        this.sourceTask.initialize(this.sourceTaskContext);
        this.sourceTask.start(taskConfig);
    }

    private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> snapshotFlushFuture) {
        if (error != null) {
            log.error("Failed to flush offsets to storage: ", error);
            this.offsetWriter.cancelFlush();
            snapshotFlushFuture.completeExceptionally(new Exception("No Offsets Added Error", error));
            return;
        }
        try {
            this.sourceTask.commit();
            if (log.isDebugEnabled()) {
                log.debug("Finished flushing offsets to storage");
            }
            snapshotFlushFuture.complete(null);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            log.warn("Flush interrupted, cancelling", (Throwable)ie);
            this.offsetWriter.cancelFlush();
            snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", ie));
        }
        catch (Throwable t) {
            log.warn("Flush failed, cancelling", t);
            this.offsetWriter.cancelFlush();
            snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", t));
        }
    }

    private void triggerOffsetsFlushIfNeeded() {
        block12: {
            CompletableFuture<Void> snapshotFlushFuture = this.flushFutureRef.get();
            if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || this.outstandingRecords.get() != 0) {
                return;
            }
            if (!this.flushing.compareAndSet(false, true)) {
                return;
            }
            try {
                if (this.offsetWriter.beginFlush()) {
                    this.offsetWriter.doFlush((error, ignored) -> {
                        try {
                            this.onOffsetsFlushed(error, snapshotFlushFuture);
                        }
                        finally {
                            this.flushing.set(false);
                        }
                    });
                    break block12;
                }
                try {
                    this.onOffsetsFlushed(null, snapshotFlushFuture);
                }
                finally {
                    this.flushing.set(false);
                }
            }
            catch (ConnectException connectException) {
            }
            catch (Exception t) {
                try {
                    this.onOffsetsFlushed(t, snapshotFlushFuture);
                }
                finally {
                    this.flushing.set(false);
                }
            }
        }
    }

    public synchronized Record<T> read() throws Exception {
        while (true) {
            if (this.currentBatch == null) {
                List recordList = this.sourceTask.poll();
                if (recordList == null || recordList.isEmpty()) continue;
                this.outstandingRecords.addAndGet(recordList.size());
                this.currentBatch = recordList.iterator();
                CompletableFuture newFuture = new CompletableFuture();
                this.flushFutureRef.set(newFuture);
            }
            if (this.currentBatch.hasNext()) {
                AbstractKafkaSourceRecord<T> processRecord = this.processSourceRecord(this.currentBatch.next());
                if (processRecord == null || processRecord.isEmpty()) {
                    this.outstandingRecords.decrementAndGet();
                    this.triggerOffsetsFlushIfNeeded();
                    continue;
                }
                return processRecord;
            }
            CompletableFuture<Void> snapshotFlushFuture = this.flushFutureRef.get();
            try {
                if (snapshotFlushFuture == null) continue;
                snapshotFlushFuture.get();
                continue;
            }
            catch (ExecutionException ex) {
                log.error("execution exception while get flushFuture", (Throwable)ex);
                throw new Exception("Flush failed", ex.getCause());
            }
            finally {
                this.flushing.set(false);
                this.flushFutureRef.compareAndSet(snapshotFlushFuture, null);
                this.currentBatch = null;
                continue;
            }
            break;
        }
    }

    public void close() {
        if (this.sourceTask != null) {
            this.sourceTask.stop();
            this.sourceTask = null;
        }
        if (this.connector != null) {
            this.connector.stop();
            this.connector = null;
        }
        if (this.offsetStore != null) {
            this.offsetStore.stop();
            this.offsetStore = null;
        }
    }

    public abstract AbstractKafkaSourceRecord<T> processSourceRecord(SourceRecord var1);

    @Generated
    public SourceTask getSourceTask() {
        return this.sourceTask;
    }

    @Generated
    public OffsetStorageWriter getOffsetWriter() {
        return this.offsetWriter;
    }

    public abstract class AbstractKafkaSourceRecord<T>
    implements Record {
        Optional<String> key;
        T value;
        Optional<String> topicName;
        Optional<Long> eventTime;
        Optional<String> partitionId;
        Optional<String> destinationTopic;
        Optional<Integer> partitionIndex;
        KafkaSchemaWrappedSchema keySchema;
        KafkaSchemaWrappedSchema valueSchema;

        AbstractKafkaSourceRecord(SourceRecord srcRecord) {
            String topic = srcRecord.topic();
            if (topic.contains("://")) {
                try {
                    TopicName.get((String)topic);
                    this.destinationTopic = Optional.of(topic);
                }
                catch (IllegalArgumentException e) {
                    this.destinationTopic = Optional.of("persistent://" + AbstractKafkaConnectSource.this.topicNamespace + "/" + topic);
                }
            } else {
                this.destinationTopic = Optional.of("persistent://" + AbstractKafkaConnectSource.this.topicNamespace + "/" + topic);
            }
            this.partitionIndex = Optional.ofNullable(srcRecord.kafkaPartition());
        }

        public Optional<Long> getRecordSequence() {
            return RECORD_SEQUENCE;
        }

        public Map<String, String> getProperties() {
            return PROPERTIES;
        }

        public boolean isEmpty() {
            return this.value == null;
        }

        public void ack() {
            if (AbstractKafkaConnectSource.this.outstandingRecords.decrementAndGet() == 0) {
                AbstractKafkaConnectSource.this.triggerOffsetsFlushIfNeeded();
            }
        }

        public void fail() {
            CompletableFuture<Void> snapshotFlushFuture = AbstractKafkaConnectSource.this.flushFutureRef.get();
            if (snapshotFlushFuture != null) {
                snapshotFlushFuture.completeExceptionally(new Exception("Sink Error"));
            }
        }

        @Generated
        public Optional<String> getKey() {
            return this.key;
        }

        @Generated
        public T getValue() {
            return this.value;
        }

        @Generated
        public Optional<String> getTopicName() {
            return this.topicName;
        }

        @Generated
        public Optional<Long> getEventTime() {
            return this.eventTime;
        }

        @Generated
        public Optional<String> getPartitionId() {
            return this.partitionId;
        }

        @Generated
        public Optional<String> getDestinationTopic() {
            return this.destinationTopic;
        }

        @Generated
        public Optional<Integer> getPartitionIndex() {
            return this.partitionIndex;
        }
    }
}

