package io.debezium.transforms;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/transforms/ByLogicalTableRouter.class */
public class ByLogicalTableRouter<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final Field TOPIC_REGEX = Field.create("topic.regex").withDisplayName("Topic regex").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isRequired, Field::isRegex).withDescription("The regex used for extracting the name of the logical table from the original topic name.");
    private static final Field TOPIC_REPLACEMENT = Field.create("topic.replacement").withDisplayName("Topic replacement").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isRequired).withDescription("The replacement string used in conjunction with " + TOPIC_REGEX.name() + ". This will be used to create the new topic name.");
    private static final Field KEY_FIELD_REGEX = Field.create("key.field.regex").withDisplayName("Key field regex").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isRegex).withDescription("The regex used for extracting the physical table identifier from the original topic name. Now that multiple physical tables can share a topic, the event's key may need to be augmented to include fields other than just those for the record's primary/unique key, since these are not guaranteed to be unique across tables. We need some identifier added to the key that distinguishes the different physical tables.");
    private static final Field KEY_FIELD_NAME = Field.create("key.field.name").withDisplayName("Key field name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withDefault("__dbz__physicalTableIdentifier").withDescription("Each record's key schema will be augmented with this field name. The purpose of this field is to distinguish the different physical tables that can now share a single topic. Make sure not to configure a field name that is at risk of conflict with existing key schema field names.");
    private static final Field KEY_FIELD_REPLACEMENT = Field.create("key.field.replacement").withDisplayName("Key field replacement").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(ByLogicalTableRouter::validateKeyFieldReplacement).withDescription("The replacement string used in conjunction with " + KEY_FIELD_REGEX.name() + ". This will be used to create the physical table identifier in the record's key.");
    private static final Logger logger = LoggerFactory.getLogger(ByLogicalTableRouter.class);
    private Pattern topicRegex;
    private String topicReplacement;
    private Pattern keyFieldRegex;
    private String keyFieldReplacement;
    private String keyFieldName;
    private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
    private final Cache<Schema, Schema> keySchemaUpdateCache = new SynchronizedCache(new LRUCache(16));
    private final Cache<Schema, Schema> envelopeSchemaUpdateCache = new SynchronizedCache(new LRUCache(16));
    private final Cache<String, String> keyRegexReplaceCache = new SynchronizedCache(new LRUCache(16));
    private final Cache<String, String> topicRegexReplaceCache = new SynchronizedCache(new LRUCache(16));

    private static int validateKeyFieldReplacement(Configuration configuration, Field field, Field.ValidationOutput validationOutput) {
        String string = configuration.getString(KEY_FIELD_REGEX);
        if (string != null) {
            string = string.trim();
        }
        String string2 = configuration.getString(KEY_FIELD_REPLACEMENT);
        if (string2 != null) {
            string2 = string2.trim();
        }
        if (Strings.isNullOrEmpty(string) || !Strings.isNullOrEmpty(string2)) {
            return 0;
        }
        validationOutput.accept(KEY_FIELD_REPLACEMENT, null, String.format("%s must be non-empty if %s is set.", KEY_FIELD_REPLACEMENT.name(), KEY_FIELD_REGEX.name()));
        return 1;
    }

    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        Field.Set of = Field.setOf(TOPIC_REGEX, TOPIC_REPLACEMENT, KEY_FIELD_REGEX, KEY_FIELD_REPLACEMENT);
        Logger logger2 = logger;
        logger2.getClass();
        if (!from.validateAndRecord(of, logger2::error)) {
            throw new ConnectException("Unable to validate config.");
        }
        this.topicRegex = Pattern.compile(from.getString(TOPIC_REGEX));
        this.topicReplacement = from.getString(TOPIC_REPLACEMENT);
        String string = from.getString(KEY_FIELD_REGEX);
        if (string != null) {
            string = string.trim();
        }
        if (string != null && !string.isEmpty()) {
            this.keyFieldRegex = Pattern.compile(from.getString(KEY_FIELD_REGEX));
            this.keyFieldReplacement = from.getString(KEY_FIELD_REPLACEMENT);
        }
        this.keyFieldName = from.getString(KEY_FIELD_NAME);
    }

    public R apply(R r) {
        String str = r.topic();
        String determineNewTopic = determineNewTopic(str);
        if (determineNewTopic == null) {
            return r;
        }
        logger.debug("Applying topic name transformation from {} to {}", str, determineNewTopic);
        Schema schema = null;
        Struct struct = null;
        if (r.key() != null) {
            Struct requireStruct = Requirements.requireStruct(r.key(), "Updating schema");
            schema = updateKeySchema(requireStruct.schema(), determineNewTopic);
            struct = updateKey(schema, requireStruct, str);
        }
        if (r.value() == null) {
            return (R) r.newRecord(determineNewTopic, r.kafkaPartition(), schema, struct, r.valueSchema(), r.value(), r.timestamp());
        }
        Struct requireStruct2 = Requirements.requireStruct(r.value(), "Updating schema");
        Schema updateEnvelopeSchema = updateEnvelopeSchema(requireStruct2.schema(), determineNewTopic);
        return (R) r.newRecord(determineNewTopic, r.kafkaPartition(), schema, struct, updateEnvelopeSchema, updateEnvelope(updateEnvelopeSchema, requireStruct2), r.timestamp());
    }

    public void close() {
    }

    public ConfigDef config() {
        ConfigDef configDef = new ConfigDef();
        Field.group(configDef, null, TOPIC_REGEX, TOPIC_REPLACEMENT, KEY_FIELD_REGEX, KEY_FIELD_REPLACEMENT);
        return configDef;
    }

    private String determineNewTopic(String str) {
        String str2 = (String) this.topicRegexReplaceCache.get(str);
        if (str2 != null) {
            return str2;
        }
        Matcher matcher = this.topicRegex.matcher(str);
        if (!matcher.matches()) {
            return null;
        }
        String replaceFirst = matcher.replaceFirst(this.topicReplacement);
        this.topicRegexReplaceCache.put(str, replaceFirst);
        return replaceFirst;
    }

    private Schema updateKeySchema(Schema schema, String str) {
        Schema schema2 = (Schema) this.keySchemaUpdateCache.get(schema);
        if (schema2 != null) {
            return schema2;
        }
        SchemaBuilder copySchemaExcludingName = copySchemaExcludingName(schema, SchemaBuilder.struct());
        copySchemaExcludingName.name(this.schemaNameAdjuster.adjust(str + ".Key"));
        copySchemaExcludingName.field(this.keyFieldName, Schema.STRING_SCHEMA);
        Schema build = copySchemaExcludingName.build();
        this.keySchemaUpdateCache.put(schema, build);
        return build;
    }

    private Struct updateKey(Schema schema, Struct struct, String str) {
        Struct struct2 = new Struct(schema);
        for (org.apache.kafka.connect.data.Field field : struct.schema().fields()) {
            struct2.put(field.name(), struct.get(field));
        }
        String str2 = str;
        if (this.keyFieldRegex != null) {
            str2 = (String) this.keyRegexReplaceCache.get(str);
            if (str2 == null) {
                Matcher matcher = this.keyFieldRegex.matcher(str);
                if (matcher.matches()) {
                    str2 = matcher.replaceFirst(this.keyFieldReplacement);
                    this.keyRegexReplaceCache.put(str, str2);
                } else {
                    str2 = str;
                }
            }
        }
        struct2.put(this.keyFieldName, str2);
        return struct2;
    }

    private Schema updateEnvelopeSchema(Schema schema, String str) {
        Schema schema2 = (Schema) this.envelopeSchemaUpdateCache.get(schema);
        if (schema2 != null) {
            return schema2;
        }
        SchemaBuilder copySchemaExcludingName = copySchemaExcludingName(schema.field(Envelope.FieldName.BEFORE).schema(), SchemaBuilder.struct());
        copySchemaExcludingName.name(this.schemaNameAdjuster.adjust(str + ".Value"));
        Schema build = copySchemaExcludingName.build();
        SchemaBuilder copySchemaExcludingName2 = copySchemaExcludingName(schema, SchemaBuilder.struct(), false);
        for (org.apache.kafka.connect.data.Field field : schema.fields()) {
            String name = field.name();
            Schema schema3 = field.schema();
            if (Objects.equals(name, Envelope.FieldName.BEFORE) || Objects.equals(name, Envelope.FieldName.AFTER)) {
                schema3 = build;
            }
            copySchemaExcludingName2.field(name, schema3);
        }
        copySchemaExcludingName2.name(this.schemaNameAdjuster.adjust(str + ".Envelope"));
        Schema build2 = copySchemaExcludingName2.build();
        this.envelopeSchemaUpdateCache.put(schema, build2);
        return build2;
    }

    private Struct updateEnvelope(Schema schema, Struct struct) {
        Struct struct2 = new Struct(schema);
        Schema schema2 = schema.field(Envelope.FieldName.BEFORE).schema();
        for (org.apache.kafka.connect.data.Field field : struct.schema().fields()) {
            String name = field.name();
            Object obj = struct.get(field);
            if ((Objects.equals(name, Envelope.FieldName.BEFORE) || Objects.equals(name, Envelope.FieldName.AFTER)) && obj != null) {
                obj = updateValue(schema2, Requirements.requireStruct(obj, "Updating schema"));
            }
            struct2.put(name, obj);
        }
        return struct2;
    }

    private Struct updateValue(Schema schema, Struct struct) {
        Struct struct2 = new Struct(schema);
        for (org.apache.kafka.connect.data.Field field : struct.schema().fields()) {
            struct2.put(field.name(), struct.get(field));
        }
        return struct2;
    }

    private SchemaBuilder copySchemaExcludingName(Schema schema, SchemaBuilder schemaBuilder) {
        return copySchemaExcludingName(schema, schemaBuilder, true);
    }

    private SchemaBuilder copySchemaExcludingName(Schema schema, SchemaBuilder schemaBuilder, boolean z) {
        schemaBuilder.version(schema.version());
        schemaBuilder.doc(schema.doc());
        Map parameters = schema.parameters();
        if (parameters != null) {
            schemaBuilder.parameters(parameters);
        }
        if (schema.isOptional()) {
            schemaBuilder.optional();
        } else {
            schemaBuilder.required();
        }
        if (z) {
            for (org.apache.kafka.connect.data.Field field : schema.fields()) {
                schemaBuilder.field(field.name(), field.schema());
            }
        }
        return schemaBuilder;
    }
}
