/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.DebeziumHeaderProducer;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.channels.SourceSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.pipeline.txmetadata.DefaultTransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.processors.PostProcessorRegistry;
import io.debezium.processors.spi.PostProcessor;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Loggings;
import java.time.Instant;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDispatcher<P extends Partition, T extends DataCollectionId>
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    protected final TransactionMonitor transactionMonitor;
    private final TopicNamingStrategy<T> topicNamingStrategy;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;
    private final ChangeEventCreator changeEventCreator;
    private final DebeziumHeaderProducer debeziumHeaderProducer;
    private final Heartbeat heartbeat;
    private DataChangeEventListener<P> eventListener = DataChangeEventListener.NO_OP();
    private final boolean emitTombstonesOnDelete;
    private final InconsistentSchemaHandler<P, T> inconsistentSchemaHandler;
    private final CommonConnectorConfig connectorConfig;
    private final EnumSet<Envelope.Operation> skippedOperations;
    private final boolean neverSkip;
    private final Schema schemaChangeKeySchema;
    private final Schema schemaChangeValueSchema;
    private final ConnectTableChangeSerializer tableChangesSerializer;
    private final SourceSignalChannel sourceSignalChannel;
    private IncrementalSnapshotChangeEventSource<P, T> incrementalSnapshotChangeEventSource;
    private final StreamingChangeRecordReceiver streamingReceiver;
    private final SignalProcessor<P, ?> signalProcessor;
    private final PostProcessorRegistry postProcessorRegistry;

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster, SignalProcessor<P, ?> signalProcessor, DebeziumHeaderProducer debeziumHeaderProducer) {
        this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider, connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, null, null), schemaNameAdjuster, signalProcessor, debeziumHeaderProducer);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster, SignalProcessor<P, ?> signalProcessor, DebeziumHeaderProducer debeziumHeaderProducer) {
        this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider, heartbeat, schemaNameAdjuster, signalProcessor, debeziumHeaderProducer);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster, DebeziumHeaderProducer debeziumHeaderProducer) {
        this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider, heartbeat, schemaNameAdjuster, null, debeziumHeaderProducer);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster, DebeziumHeaderProducer debeziumHeaderProducer) {
        this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider, connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, null, null), schemaNameAdjuster, null, debeziumHeaderProducer);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster, SignalProcessor<P, ?> signalProcessor, DebeziumHeaderProducer debeziumHeaderProducer) {
        this.debeziumHeaderProducer = debeziumHeaderProducer;
        this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
        this.connectorConfig = connectorConfig;
        this.topicNamingStrategy = topicNamingStrategy;
        this.schema = schema;
        this.historizedSchema = schema.isHistorized() ? (HistorizedDatabaseSchema)schema : null;
        this.queue = queue;
        this.filter = filter;
        this.changeEventCreator = changeEventCreator;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
        this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
        this.skippedOperations = connectorConfig.getSkippedOperations();
        this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
        this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, schemaNameAdjuster, this::enqueueTransactionMessage, topicNamingStrategy.transactionTopic());
        this.signalProcessor = signalProcessor;
        if (signalProcessor != null) {
            this.sourceSignalChannel = signalProcessor.getSignalChannel(SourceSignalChannel.class);
            this.sourceSignalChannel.init(connectorConfig);
        } else {
            this.sourceSignalChannel = null;
        }
        this.heartbeat = heartbeat;
        this.schemaChangeKeySchema = SchemaFactory.get().schemaHistoryConnectorKeySchema(schemaNameAdjuster, connectorConfig);
        this.schemaChangeValueSchema = SchemaFactory.get().schemaHistoryConnectorValueSchema(schemaNameAdjuster, connectorConfig, this.tableChangesSerializer);
        this.postProcessorRegistry = connectorConfig.getServiceRegistry().tryGetService(PostProcessorRegistry.class);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster, TransactionMonitor transactionMonitor, SignalProcessor<P, ?> signalProcessor, DebeziumHeaderProducer debeziumHeaderProducer) {
        this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
        this.connectorConfig = connectorConfig;
        this.topicNamingStrategy = topicNamingStrategy;
        this.schema = schema;
        this.historizedSchema = schema.isHistorized() ? (HistorizedDatabaseSchema)schema : null;
        this.queue = queue;
        this.filter = filter;
        this.changeEventCreator = changeEventCreator;
        this.debeziumHeaderProducer = debeziumHeaderProducer;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
        this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
        this.skippedOperations = connectorConfig.getSkippedOperations();
        this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
        this.transactionMonitor = transactionMonitor;
        this.signalProcessor = signalProcessor;
        if (signalProcessor != null) {
            this.sourceSignalChannel = signalProcessor.getSignalChannel(SourceSignalChannel.class);
            this.sourceSignalChannel.init(connectorConfig);
        } else {
            this.sourceSignalChannel = null;
        }
        this.heartbeat = heartbeat;
        this.schemaChangeKeySchema = SchemaFactory.get().schemaHistoryConnectorKeySchema(schemaNameAdjuster, connectorConfig);
        this.schemaChangeValueSchema = SchemaFactory.get().schemaHistoryConnectorValueSchema(schemaNameAdjuster, connectorConfig, this.tableChangesSerializer);
        this.postProcessorRegistry = connectorConfig.getServiceRegistry().tryGetService(PostProcessorRegistry.class);
    }

    public void dispatchSnapshotEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter, final SnapshotReceiver<P> receiver) throws InterruptedException {
        try {
            final DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
            if (dataCollectionSchema == null) {
                this.errorOnMissingSchema(partition, dataCollectionId, changeRecordEmitter);
            }
            changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver<P>(){

                @Override
                public void changeRecord(P partition, DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException {
                    LOGGER.trace("Received change record {} for {} operation on key {} with context {}", new Object[]{Loggings.maybeRedactSensitiveData(value), operation, Loggings.maybeRedactSensitiveData(key), offset});
                    ConnectHeaders extendedHeaders = EventDispatcher.this.getExtendedHeaders(headers);
                    EventDispatcher.this.eventListener.onEvent(partition, dataCollectionSchema.id(), offset, key, value, operation);
                    receiver.changeRecord(partition, dataCollectionSchema, operation, key, value, offset, extendedHeaders);
                }
            });
        }
        catch (Exception e) {
            this.handleEventProcessingFailure(e, changeRecordEmitter.getOffset());
        }
    }

    public SnapshotReceiver<P> getSnapshotChangeEventReceiver() {
        return new BufferingSnapshotChangeRecordReceiver(this.connectorConfig.getSnapshotMaxThreads() > 1);
    }

    public SnapshotReceiver<P> getIncrementalSnapshotChangeEventReceiver(DataChangeEventListener<P> dataListener) {
        return new IncrementalSnapshotChangeRecordReceiver(dataListener);
    }

    public boolean dispatchDataChangeEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
        try {
            boolean handled = false;
            if (changeRecordEmitter.ignoreRecord() || !this.filter.isIncluded(dataCollectionId)) {
                LOGGER.trace("Filtered data change event for {}", dataCollectionId);
                this.eventListener.onFilteredEvent(partition, "source = " + String.valueOf(dataCollectionId), changeRecordEmitter.getOperation());
                this.dispatchFilteredEvent(changeRecordEmitter.getPartition(), changeRecordEmitter.getOffset());
            } else {
                DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
                if (dataCollectionSchema == null) {
                    Optional<DataCollectionSchema> replacementSchema = this.inconsistentSchemaHandler.handle(partition, dataCollectionId, changeRecordEmitter);
                    if (!replacementSchema.isPresent()) {
                        return false;
                    }
                    dataCollectionSchema = replacementSchema.get();
                }
                changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver<P>(){
                    final /* synthetic */ DataCollectionId val$dataCollectionId;
                    {
                        this.val$dataCollectionId = dataCollectionId;
                    }

                    @Override
                    public void changeRecord(P partition, DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException {
                        LOGGER.trace("Received change record {} for {} operation on key {} with context {}", new Object[]{Loggings.maybeRedactSensitiveData(value), operation, Loggings.maybeRedactSensitiveData(key), offset});
                        if (this.isASignalEventToProcess(this.val$dataCollectionId, operation) && EventDispatcher.this.sourceSignalChannel != null) {
                            EventDispatcher.this.sourceSignalChannel.process(value);
                            if (EventDispatcher.this.signalProcessor != null) {
                                EventDispatcher.this.signalProcessor.processSourceSignal();
                            }
                        }
                        if (EventDispatcher.this.neverSkip || !EventDispatcher.this.skippedOperations.contains((Object)operation)) {
                            EventDispatcher.this.transactionMonitor.dataEvent((Partition)partition, this.val$dataCollectionId, offset, key, value);
                            EventDispatcher.this.eventListener.onEvent(partition, this.val$dataCollectionId, offset, key, value, operation);
                            if (EventDispatcher.this.incrementalSnapshotChangeEventSource != null) {
                                EventDispatcher.this.incrementalSnapshotChangeEventSource.processMessage(partition, this.val$dataCollectionId, key, offset);
                            }
                            EventDispatcher.this.streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
                        }
                    }

                    private boolean isASignalEventToProcess(T dataCollectionId, Envelope.Operation operation) {
                        return (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.DELETE && EventDispatcher.this.connectorConfig.getIncrementalSnapshotWatermarkingStrategy() == CommonConnectorConfig.WatermarkStrategy.INSERT_DELETE) && EventDispatcher.this.connectorConfig.isSignalDataCollection((DataCollectionId)dataCollectionId);
                    }
                });
                handled = true;
            }
            this.heartbeat.heartbeat(changeRecordEmitter.getPartition().getSourcePartition(), changeRecordEmitter.getOffset().getOffset(), this::enqueueHeartbeat);
            return handled;
        }
        catch (Exception e) {
            this.handleEventProcessingFailure(e, changeRecordEmitter.getOffset());
            return false;
        }
    }

    private void handleEventProcessingFailure(Exception e, OffsetContext offsetContext) {
        switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
            case FAIL: {
                throw new ConnectException("Error while processing event at offset " + String.valueOf(offsetContext.getOffset()), (Throwable)e);
            }
            case WARN: {
                LOGGER.warn("Error while processing event at offset {}", offsetContext.getOffset(), (Object)e);
                break;
            }
            case SKIP: {
                LOGGER.debug("Error while processing event at offset {}", offsetContext.getOffset(), (Object)e);
                break;
            }
            default: {
                LOGGER.debug("Error while processing event with EventProcessingFailureHandlingMode not supported: {}", (Object)this.connectorConfig.getEventConvertingFailureHandlingMode(), (Object)e);
            }
        }
    }

    public void dispatchFilteredEvent(P partition, OffsetContext offset) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processFilteredEvent(partition, offset);
        }
    }

    public void dispatchTransactionCommittedEvent(P partition, OffsetContext offset, Instant timestamp) throws InterruptedException {
        this.transactionMonitor.transactionComittedEvent((Partition)partition, offset, timestamp);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionCommittedEvent(partition, offset);
        }
    }

    public void dispatchTransactionStartedEvent(P partition, String transactionId, OffsetContext offset, Instant timestamp) throws InterruptedException {
        this.dispatchTransactionStartedEvent(partition, new DefaultTransactionInfo(transactionId), offset, timestamp);
    }

    public void dispatchTransactionStartedEvent(P partition, TransactionInfo transactionInfo, OffsetContext offset, Instant timestamp) throws InterruptedException {
        this.transactionMonitor.transactionStartedEvent((Partition)partition, transactionInfo, offset, timestamp);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionStartedEvent(partition, offset);
        }
    }

    public void dispatchConnectorEvent(P partition, ConnectorEvent event) {
        this.eventListener.onConnectorEvent(partition, event);
    }

    public Optional<DataCollectionSchema> errorOnMissingSchema(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) {
        this.eventListener.onErroneousEvent(partition, "source = " + String.valueOf(dataCollectionId), changeRecordEmitter.getOperation());
        throw new IllegalArgumentException("No metadata registered for captured table " + String.valueOf(dataCollectionId));
    }

    public Optional<DataCollectionSchema> ignoreMissingSchema(T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) {
        return Optional.empty();
    }

    public void dispatchSchemaChangeEvent(P partition, OffsetContext offsetContext, T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (dataCollectionId != null && !this.filter.isIncluded(dataCollectionId) && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOGGER.trace("Filtering schema change event for {}", dataCollectionId);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processSchemaChange(partition, offsetContext, (DataCollectionId)dataCollectionId);
        }
    }

    public void dispatchSchemaChangeEvent(Collection<T> dataCollectionIds, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        boolean anyNonfilteredEvent = false;
        if (dataCollectionIds == null || dataCollectionIds.isEmpty()) {
            anyNonfilteredEvent = true;
        } else {
            for (DataCollectionId dataCollectionId : dataCollectionIds) {
                if (!this.filter.isIncluded(dataCollectionId)) continue;
                anyNonfilteredEvent = true;
                break;
            }
        }
        if (!anyNonfilteredEvent && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOGGER.trace("Filtering schema change event for {}", dataCollectionIds);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
    }

    public void alwaysDispatchHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        this.heartbeat.forcedBeat(partition.getSourcePartition(), offset.getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        this.heartbeat.heartbeat(partition.getSourcePartition(), offset.getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchHeartbeatEventAlsoToIncrementalSnapshot(P partition, OffsetContext offset) throws InterruptedException {
        this.heartbeat.heartbeat(partition.getSourcePartition(), offset.getOffset(), this::enqueueHeartbeat);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processHeartbeat(partition, offset);
        }
    }

    public boolean heartbeatsEnabled() {
        return this.heartbeat.isEnabled();
    }

    private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    private void enqueueTransactionMessage(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    private void enqueueSchemaChangeMessage(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    public void dispatchServerHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processHeartbeat(partition, offset);
        }
    }

    public void enqueueNotification(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
        if (this.queue.isBuffered()) {
            this.queue.flushBuffer(Function.identity());
        }
    }

    private ConnectHeaders getExtendedHeaders(ConnectHeaders headers) {
        if (!this.connectorConfig.isExtendedHeadersEnabled()) {
            return headers;
        }
        ConnectHeaders extendedHeaders = new ConnectHeaders((Iterable)headers);
        StreamSupport.stream(this.debeziumHeaderProducer.contextHeaders().spliterator(), false).forEach(arg_0 -> ((ConnectHeaders)extendedHeaders).add(arg_0));
        return extendedHeaders;
    }

    public void setEventListener(DataChangeEventListener<P> eventListener) {
        this.eventListener = eventListener;
    }

    public void setIncrementalSnapshotChangeEventSource(Optional<IncrementalSnapshotChangeEventSource<P, ? extends DataCollectionId>> incrementalSnapshotChangeEventSource) {
        this.incrementalSnapshotChangeEventSource = incrementalSnapshotChangeEventSource.orElse(null);
    }

    public DatabaseSchema<T> getSchema() {
        return this.schema;
    }

    public HistorizedDatabaseSchema<T> getHistorizedSchema() {
        return this.historizedSchema;
    }

    public IncrementalSnapshotChangeEventSource<P, T> getIncrementalSnapshotChangeEventSource() {
        return this.incrementalSnapshotChangeEventSource;
    }

    @Override
    public void close() {
        if (this.heartbeatsEnabled()) {
            this.heartbeat.close();
        }
    }

    protected void doPostProcessing(Object key, Struct value) {
        if (this.postProcessorRegistry != null) {
            for (PostProcessor processor : this.postProcessorRegistry.getProcessors()) {
                processor.apply(key, value);
            }
        }
    }

    @FunctionalInterface
    public static interface InconsistentSchemaHandler<P extends Partition, T extends DataCollectionId> {
        public Optional<DataCollectionSchema> handle(P var1, T var2, ChangeRecordEmitter var3);
    }

    private final class StreamingChangeRecordReceiver
    implements ChangeRecordEmitter.Receiver<P> {
        private StreamingChangeRecordReceiver() {
        }

        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record {} for {} operation on key {} with context {}", new Object[]{Loggings.maybeRedactSensitiveData(value), operation, Loggings.maybeRedactSensitiveData(key), offsetContext});
            Schema keySchema = key == null && operation == Envelope.Operation.TRUNCATE ? null : dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicNamingStrategy.dataChangeTopic(dataCollectionSchema.id());
            EventDispatcher.this.doPostProcessing(key, value);
            ConnectHeaders extendedHeaders = EventDispatcher.this.getExtendedHeaders(headers);
            SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)extendedHeaders);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(record));
            if (EventDispatcher.this.emitTombstonesOnDelete && operation == Envelope.Operation.DELETE) {
                SourceRecord tombStone = record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, null, record.timestamp(), (Iterable)record.headers());
                EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(tombStone));
            }
        }
    }

    public static interface SnapshotReceiver<P extends Partition>
    extends ChangeRecordEmitter.Receiver<P> {
        public void completeSnapshot() throws InterruptedException;
    }

    private final class BufferingSnapshotChangeRecordReceiver
    implements SnapshotReceiver<P> {
        private AtomicReference<BufferedDataChangeEvent> bufferedEventRef = new AtomicReference<BufferedDataChangeEvent>(BufferedDataChangeEvent.NULL);
        private final boolean threaded;

        BufferingSnapshotChangeRecordReceiver(boolean threaded) {
            this.threaded = threaded;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, Loggings.maybeRedactSensitiveData(key));
            EventDispatcher.this.doPostProcessing(key, value);
            SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), EventDispatcher.this.topicNamingStrategy.dataChangeTopic(dataCollectionSchema.id()), null, dataCollectionSchema.keySchema(), key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)headers);
            BufferedDataChangeEvent nextBufferedEvent = new BufferedDataChangeEvent();
            nextBufferedEvent.offsetContext = offsetContext;
            nextBufferedEvent.dataChangeEvent = new DataChangeEvent(record);
            if (this.threaded) {
                ChangeEventQueue<DataChangeEvent> changeEventQueue = EventDispatcher.this.queue;
                synchronized (changeEventQueue) {
                    EventDispatcher.this.queue.enqueue(this.bufferedEventRef.getAndSet((BufferedDataChangeEvent)nextBufferedEvent).dataChangeEvent);
                }
            } else {
                EventDispatcher.this.queue.enqueue(this.bufferedEventRef.getAndSet((BufferedDataChangeEvent)nextBufferedEvent).dataChangeEvent);
            }
        }

        @Override
        public void completeSnapshot() throws InterruptedException {
            BufferedDataChangeEvent bufferedEvent = this.bufferedEventRef.getAndSet(BufferedDataChangeEvent.NULL);
            DataChangeEvent event = bufferedEvent.dataChangeEvent;
            if (event != null) {
                SourceRecord record = event.getRecord();
                Struct envelope = (Struct)record.value();
                if (envelope.schema().field("source") != null) {
                    Struct source = envelope.getStruct("source");
                    SnapshotRecord.LAST.toSource(source);
                }
                Map offset = record.sourceOffset();
                offset.clear();
                offset.putAll(bufferedEvent.offsetContext.getOffset());
                EventDispatcher.this.queue.enqueue(event);
            }
        }
    }

    private final class IncrementalSnapshotChangeRecordReceiver
    implements SnapshotReceiver<P> {
        public final DataChangeEventListener<P> dataListener;

        IncrementalSnapshotChangeRecordReceiver(DataChangeEventListener<P> dataListener) {
            this.dataListener = dataListener;
        }

        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, Loggings.maybeRedactSensitiveData(key));
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicNamingStrategy.dataChangeTopic(dataCollectionSchema.id());
            EventDispatcher.this.doPostProcessing(key, value);
            SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)headers);
            this.dataListener.onEvent(partition, dataCollectionSchema.id(), offsetContext, keySchema, value, operation);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(record));
        }

        @Override
        public void completeSnapshot() {
        }
    }

    private final class SchemaChangeEventReceiver
    implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
            Struct result = new Struct(EventDispatcher.this.schemaChangeKeySchema);
            result.put("databaseName", (Object)event.getDatabase());
            return result;
        }

        private Struct schemaChangeRecordValue(SchemaChangeEvent event) {
            Struct result = new Struct(EventDispatcher.this.schemaChangeValueSchema);
            result.put("source", (Object)event.getSource());
            result.put("ts_ms", (Object)event.getTimestamp().toEpochMilli());
            result.put("databaseName", (Object)event.getDatabase());
            result.put("schemaName", (Object)event.getSchema());
            result.put("ddl", (Object)event.getDdl());
            result.put("tableChanges", EventDispatcher.this.tableChangesSerializer.serialize(event.getTableChanges()));
            return result;
        }

        @Override
        public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
            if (EventDispatcher.this.historizedSchema != null) {
                EventDispatcher.this.historizedSchema.applySchemaChange(event);
            }
            if (EventDispatcher.this.connectorConfig.isSchemaChangesHistoryEnabled()) {
                String topicName = EventDispatcher.this.topicNamingStrategy.schemaChangeTopic();
                Integer partition = 0;
                Struct key = this.schemaChangeRecordKey(event);
                Struct value = this.schemaChangeRecordValue(event);
                ConnectHeaders extendedHeaders = EventDispatcher.this.getExtendedHeaders(new ConnectHeaders());
                SourceRecord record = new SourceRecord(event.getPartition(), event.getOffset(), topicName, partition, EventDispatcher.this.schemaChangeKeySchema, (Object)key, EventDispatcher.this.schemaChangeValueSchema, (Object)value, null, (Iterable)extendedHeaders);
                EventDispatcher.this.enqueueSchemaChangeMessage(record);
            }
        }
    }

    private static final class BufferedDataChangeEvent {
        private static final BufferedDataChangeEvent NULL = new BufferedDataChangeEvent();
        private DataChangeEvent dataChangeEvent;
        private OffsetContext offsetContext;

        private BufferedDataChangeEvent() {
        }
    }
}

