/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.debezium;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.debezium.SerDeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class PulsarSchemaHistory
extends AbstractSchemaHistory {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PulsarSchemaHistory.class);
    public static final Field TOPIC = Field.create((String)"schema.history.internal.pulsar.topic").withDisplayName("Schema history topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(new Field.Validator[]{Field::isRequired});
    public static final Field SERVICE_URL = Field.create((String)"schema.history.internal.pulsar.service.url").withDisplayName("Pulsar service url").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("Pulsar service url").withValidation(new Field.Validator[]{Field::isOptional});
    public static final Field CLIENT_BUILDER = Field.create((String)"schema.history.internal.pulsar.client.builder").withDisplayName("Pulsar client builder").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("Pulsar client builder").withValidation(new Field.Validator[]{Field::isOptional});
    public static final Field READER_CONFIG = Field.create((String)"schema.history.internal.pulsar.reader.config").withDisplayName("Extra configs of the reader").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The configs of the reader for the database schema history topic, in the form of a JSON string with key-value pairs").withDefault((String)null).withValidation(new Field.Validator[]{Field::isOptional});
    public static final Field.Set ALL_FIELDS = Field.setOf((Field[])new Field[]{TOPIC, SERVICE_URL, CLIENT_BUILDER, SchemaHistory.NAME, READER_CONFIG});
    private final ObjectMapper mapper = new ObjectMapper();
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String topicName;
    private Map<String, Object> readerConfigMap = new HashMap<String, Object>();
    private String dbHistoryName;
    private ClientBuilder clientBuilder;
    private volatile PulsarClient pulsarClient;
    private volatile Producer<String> producer;

    public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        if (!config.validateAndRecord((Iterable)ALL_FIELDS, arg_0 -> ((Logger)this.logger).error(arg_0))) {
            throw new IllegalArgumentException("Error configuring an instance of " + ((Object)((Object)this)).getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = config.getString(TOPIC);
        try {
            String configString = config.getString(READER_CONFIG);
            this.readerConfigMap = configString == null ? Collections.emptyMap() : (Map)this.mapper.readValue(configString, Map.class);
        }
        catch (JsonProcessingException exception) {
            log.warn("The provided reader configs are invalid, will not passing any extra config to the reader builder.", (Throwable)exception);
        }
        String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
        if (StringUtils.isBlank((CharSequence)clientBuilderBase64Encoded) && StringUtils.isBlank((CharSequence)config.getString(SERVICE_URL))) {
            throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
        }
        this.clientBuilder = PulsarClient.builder();
        if (!StringUtils.isBlank((CharSequence)clientBuilderBase64Encoded)) {
            this.clientBuilder = (ClientBuilder)SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader());
        } else {
            this.clientBuilder.serviceUrl(config.getString(SERVICE_URL));
        }
        this.dbHistoryName = config.getString(SchemaHistory.NAME, UUID.randomUUID().toString());
        log.info("Configure to store the debezium schema history {} to pulsar topic {}", (Object)this.dbHistoryName, (Object)this.topicName);
    }

    public void initializeStorage() {
        super.initializeStorage();
        try (Producer p = this.pulsarClient.newProducer(Schema.STRING).topic(this.topicName).create();){
            p.send((Object)"");
        }
        catch (PulsarClientException pce) {
            log.error("Failed to initialize storage", (Throwable)pce);
            throw new SchemaHistoryException("Failed to initialize storage", (Throwable)pce);
        }
    }

    void setupClientIfNeeded() {
        if (null == this.pulsarClient) {
            try {
                this.pulsarClient = this.clientBuilder.build();
            }
            catch (PulsarClientException e) {
                throw new SchemaHistoryException("Failed to create pulsar client to pulsar cluster", (Throwable)e);
            }
        }
    }

    void setupProducerIfNeeded() {
        this.setupClientIfNeeded();
        if (null == this.producer) {
            try {
                this.producer = this.pulsarClient.newProducer(Schema.STRING).topic(this.topicName).producerName(this.dbHistoryName).blockIfQueueFull(true).create();
            }
            catch (PulsarClientException e) {
                log.error("Failed to create pulsar producer to topic '{}'", (Object)this.topicName);
                throw new RuntimeException("Failed to create pulsar producer to topic '" + this.topicName, e);
            }
        }
    }

    public void start() {
        super.start();
        this.setupProducerIfNeeded();
    }

    protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing schema history records.");
        }
        if (log.isTraceEnabled()) {
            log.trace("Storing record into schema history: {}", (Object)record);
        }
        try {
            this.producer.send((Object)record.toString());
        }
        catch (PulsarClientException e) {
            throw new SchemaHistoryException((Throwable)e);
        }
    }

    public void stop() {
        try {
            if (this.producer != null) {
                try {
                    this.producer.flush();
                }
                catch (PulsarClientException pulsarClientException) {
                }
                finally {
                    this.producer.close();
                }
                this.producer = null;
            }
            if (this.pulsarClient != null) {
                this.pulsarClient.close();
                this.pulsarClient = null;
            }
        }
        catch (PulsarClientException pe) {
            log.warn("Failed to closing pulsar client", (Throwable)pe);
        }
    }

    protected void recoverRecords(Consumer<HistoryRecord> records) {
        this.setupClientIfNeeded();
        try (Reader<String> historyReader = this.createHistoryReader();){
            log.info("Scanning the schema history topic '{}'", (Object)this.topicName);
            MessageId lastProcessedMessageId = null;
            while (historyReader.hasMessageAvailable()) {
                Message msg = historyReader.readNext();
                try {
                    if (null != lastProcessedMessageId && lastProcessedMessageId.compareTo((Object)msg.getMessageId()) >= 0) continue;
                    if (!StringUtils.isBlank((CharSequence)((CharSequence)msg.getValue()))) {
                        HistoryRecord recordObj = new HistoryRecord(this.reader.read((String)msg.getValue()));
                        if (log.isTraceEnabled()) {
                            log.trace("Recovering schema history: {}", (Object)recordObj);
                        }
                        if (!recordObj.isValid()) {
                            log.warn("Skipping invalid schema history record '{}'. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", (Object)recordObj, (Object)this.topicName);
                        } else {
                            records.accept(recordObj);
                            log.trace("Recovered schema history: {}", (Object)recordObj);
                        }
                    }
                    lastProcessedMessageId = msg.getMessageId();
                }
                catch (IOException ioe) {
                    log.error("Error while deserializing history record '{}'", msg.getValue(), (Object)ioe);
                }
                catch (Exception e) {
                    throw e;
                }
            }
            log.info("Successfully completed scanning the schema history topic '{}'", (Object)this.topicName);
        }
        catch (IOException ioe) {
            log.error("Encountered issues on recovering history records", (Throwable)ioe);
            throw new RuntimeException("Encountered issues on recovering history records", ioe);
        }
    }

    public boolean exists() {
        boolean bl;
        block8: {
            this.setupClientIfNeeded();
            Reader<String> historyReader = this.createHistoryReader();
            try {
                bl = historyReader.hasMessageAvailable();
                if (historyReader == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (historyReader != null) {
                        try {
                            historyReader.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    log.error("Encountered issues on checking existence of schema history", (Throwable)e);
                    throw new RuntimeException("Encountered issues on checking existence of schema history", e);
                }
            }
            historyReader.close();
        }
        return bl;
    }

    public boolean storageExists() {
        return true;
    }

    public String toString() {
        if (this.topicName != null) {
            return "Pulsar topic (" + this.topicName + ")";
        }
        return "Pulsar topic";
    }

    @VisibleForTesting
    Reader<String> createHistoryReader() throws PulsarClientException {
        return this.pulsarClient.newReader(Schema.STRING).topic(this.topicName).startMessageId(MessageId.earliest).loadConf(this.readerConfigMap).create();
    }
}

