package io.debezium.transforms.outbox;

import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.time.Timestamp;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.fest.assertions.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:io/debezium/transforms/outbox/EventRouterTest.class */
public class EventRouterTest {

    @Rule
    public ExpectedException exceptionRule = ExpectedException.none();

    @Test
    public void canSkipTombstone() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Assertions.assertThat(eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "123123", (Schema) null, (Object) null))).isNull();
    }

    @Test
    public void canSkipDeletion() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Schema build = SchemaBuilder.struct().field("id", SchemaBuilder.string()).build();
        Envelope build2 = Envelope.defineSchema().withName("dummy.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("id", "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Assertions.assertThat(eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build2.schema(), build2.delete(struct, (Struct) null, Instant.now())))).isNull();
    }

    @Test
    @FixFor({"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaName() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Schema build = SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        SourceRecord sourceRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build, struct);
        Assertions.assertThat(eventRouter.apply(sourceRecord)).isSameAs(sourceRecord);
    }

    @Test
    public void shouldFailWhenTheSchemaLooksValidButDoesNotHaveTheCorrectFields() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Schema build = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat.Envelope").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        SourceRecord sourceRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build, struct);
        this.exceptionRule.expect(DataException.class);
        this.exceptionRule.expectMessage("op is not a valid field name");
        eventRouter.apply(sourceRecord);
    }

    @Test
    @FixFor({"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingSchemaNameSuffix() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Schema build = SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build();
        Struct struct = new Struct(build);
        struct.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        SourceRecord sourceRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build, struct);
        Assertions.assertThat(eventRouter.apply(sourceRecord)).isSameAs(sourceRecord);
    }

    @Test
    @FixFor({"DBZ-1383"})
    public void canSkipMessagesWithoutDebeziumCdcEnvelopeDueToMissingValueSchema() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Struct struct = new Struct(SchemaBuilder.struct().name("io.debezium.connector.common.Heartbeat").field("ts_ms", Schema.INT64_SCHEMA).build());
        struct.put("ts_ms", Long.valueOf(Instant.now().toEpochMilli()));
        SourceRecord sourceRecord = new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", (Schema) null, struct);
        Assertions.assertThat(eventRouter.apply(sourceRecord)).isSameAs(sourceRecord);
    }

    @Test
    public void canSkipUpdates() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        Schema build = SchemaBuilder.struct().field("id", SchemaBuilder.string()).build();
        Envelope build2 = Envelope.defineSchema().withName("dummy.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("id", "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        Assertions.assertThat(eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build2.schema(), build2.update(struct, struct, (Struct) null, Instant.now())))).isNull();
    }

    @Test(expected = IllegalStateException.class)
    public void canFailOnUpdates() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), EventRouterConfigDefinition.InvalidOperationBehavior.FATAL.getValue());
        eventRouter.configure(hashMap);
        Schema build = SchemaBuilder.struct().field("id", SchemaBuilder.string()).build();
        Envelope build2 = Envelope.defineSchema().withName("dummy.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("id", "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5");
        eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", SchemaBuilder.STRING_SCHEMA, "772590bf-ef2d-4814-b4bf-ddc6f5f8b9c5", build2.schema(), build2.update(struct, struct, (Struct) null, Instant.now())));
    }

    @Test
    public void canExtractTableFields() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.value()).isEqualTo("{}");
        Assertions.assertThat(apply.valueSchema().version()).isNull();
    }

    @Test
    public void canSetDefaultMessageKey() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(apply.key()).isEqualTo("10711fa5");
    }

    @Test
    public void canSetMessageKey() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "type");
        eventRouter.configure(hashMap);
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(apply.key()).isEqualTo("UserCreated");
    }

    @Test(expected = DataException.class)
    public void failsOnInvalidSetMessageKey() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "fakefield");
        eventRouter.configure(hashMap);
        eventRouter.apply(createEventRecord());
    }

    @Test
    public void canSetTimestampFromDebeziumEnvelopeByDefault() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord createEventRecord = createEventRecord();
        SourceRecord apply = eventRouter.apply(createEventRecord);
        Long int64 = Requirements.requireStruct(createEventRecord.value(), "Test timestamp").getInt64("ts_ms");
        Assertions.assertThat(createEventRecord.timestamp()).isNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(int64);
    }

    @Test
    public void canSetTimestampByUserDefinedConfiguration() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "event_timestamp");
        eventRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("event_timestamp", Timestamp.schema());
        HashMap hashMap3 = new HashMap();
        hashMap3.put("event_timestamp", 14222264625338L);
        SourceRecord createEventRecord = createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", hashMap2, hashMap3);
        SourceRecord apply = eventRouter.apply(createEventRecord);
        Assertions.assertThat(createEventRecord.timestamp()).isNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(14222264625338L);
    }

    @Test
    public void canRouteBasedOnField() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "aggregatetype");
        eventRouter.configure(hashMap);
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        SourceRecord apply2 = eventRouter.apply(createEventRecord("ab720dd3-176d-40a6-96f3-6cf961d7df6a", "UserUpdate", "10711fa5", "User", "{}"));
        Assertions.assertThat(apply2).isNotNull();
        Assertions.assertThat(apply2.topic()).isEqualTo("outbox.event.User");
        SourceRecord apply3 = eventRouter.apply(createEventRecord("ab720dd3-176d-40a6-96f3-6cf961d7df6a", "AddressCreated", "10711fa5", "Address", "{}"));
        Assertions.assertThat(apply3).isNotNull();
        Assertions.assertThat(apply3.topic()).isEqualTo("outbox.event.Address");
    }

    @Test
    public void canConfigureEveryTableField() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        hashMap.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        hashMap.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        hashMap.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "payload_id");
        eventRouter.configure(hashMap);
        Schema build = SchemaBuilder.struct().field("event_id", SchemaBuilder.string()).field("payload_id", SchemaBuilder.string()).field("event_type", SchemaBuilder.string()).field("payload_body", SchemaBuilder.string()).build();
        Envelope build2 = Envelope.defineSchema().withName("event.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("event_id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c");
        struct.put("payload_id", "10711fa5");
        struct.put("event_type", "UserCreated");
        struct.put("payload_body", "{}");
        SourceRecord apply = eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", build2.schema(), build2.create(struct, (Struct) null, Instant.now())));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.value()).isEqualTo("{}");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(1);
        Header header = (Header) headers.iterator().next();
        Assertions.assertThat(header.key()).isEqualTo("id");
        Assertions.assertThat(header.value()).isEqualTo("da8d6de6-3b77-45ff-8f44-57db55a7a06c");
    }

    @Test
    public void canInfluenceTableColumnTypes() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
        hashMap.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
        hashMap.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
        hashMap.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field");
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "some_boolean:envelope:bool");
        eventRouter.configure(hashMap);
        Schema build = SchemaBuilder.struct().field("event_id", SchemaBuilder.int32()).field("payload_id", SchemaBuilder.int32()).field("my_route_field", SchemaBuilder.string()).field("event_type", SchemaBuilder.bytes()).field("payload_body", SchemaBuilder.bytes()).field("some_boolean", SchemaBuilder.bool()).build();
        Envelope build2 = Envelope.defineSchema().withName("event.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("event_id", 2);
        struct.put("payload_id", 1232);
        struct.put("event_type", "CoolSchemaCreated".getBytes());
        struct.put("my_route_field", "routename");
        struct.put("payload_body", "{}".getBytes());
        struct.put("some_boolean", true);
        SourceRecord apply = eventRouter.apply(new SourceRecord(new HashMap(), new HashMap(), "db.outbox", build2.schema(), build2.create(struct, (Struct) null, Instant.now())));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.routename");
        Schema valueSchema = apply.valueSchema();
        Assertions.assertThat(valueSchema.field("payload").schema().type()).isEqualTo(SchemaBuilder.bytes().type());
        Assertions.assertThat(valueSchema.field("bool").schema().type()).isEqualTo(SchemaBuilder.bool().type());
        Assertions.assertThat(((Struct) apply.value()).get("payload")).isEqualTo("{}".getBytes());
        Assertions.assertThat(apply.key()).isEqualTo(1232);
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(1);
        Header header = (Header) headers.iterator().next();
        Assertions.assertThat(header.key()).isEqualTo("id");
        Assertions.assertThat(header.value()).isEqualTo(2);
    }

    @Test
    public void canSetSchemaVersionWhenMoreThanPayloadIsInEnvelope() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:eventType");
        eventRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("version", Schema.INT32_SCHEMA);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("version", 1);
        SourceRecord apply = eventRouter.apply(createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", hashMap2, hashMap3));
        Assertions.assertThat(apply.valueSchema().version()).isEqualTo(1);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("version", 3);
        Assertions.assertThat(eventRouter.apply(createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", hashMap2, hashMap4)).valueSchema().version()).isEqualTo(3);
        SourceRecord apply2 = eventRouter.apply(createEventRecord("18f94a39-b931-41b7-837c-6fc23b013597", "UserCreated", "1b10b70b", "User", "{}", hashMap2, hashMap3));
        Assertions.assertThat(apply2.valueSchema().version()).isEqualTo(1);
        Assertions.assertThat(apply.valueSchema()).isSameAs(apply2.valueSchema());
    }

    @Test
    public void shouldNotSetSchemaVersionByDefault() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_SCHEMA_VERSION.name(), "version");
        eventRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("version", Schema.INT32_SCHEMA);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("version", 1);
        Assertions.assertThat(eventRouter.apply(createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", hashMap2, hashMap3)).valueSchema().version()).isNull();
        HashMap hashMap4 = new HashMap();
        hashMap4.put("version", 3);
        Assertions.assertThat(eventRouter.apply(createEventRecord("166080d9-3b0e-4a04-81fe-2058a7386f1f", "UserCreated", "420b186d", "User", "{}", hashMap2, hashMap4)).valueSchema().version()).isNull();
        Assertions.assertThat(eventRouter.apply(createEventRecord("18f94a39-b931-41b7-837c-6fc23b013597", "UserCreated", "1b10b70b", "User", "{}", hashMap2, hashMap3)).valueSchema().version()).isNull();
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelope() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
        eventRouter.configure(hashMap);
        Assertions.assertThat(((Struct) eventRouter.apply(createEventRecord()).value()).get("type")).isEqualTo("UserCreated");
    }

    @Test
    public void canSetPayloadTypeIntoTheEnvelopeWithAlias() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:aggregateType");
        eventRouter.configure(hashMap);
        Assertions.assertThat(((Struct) eventRouter.apply(createEventRecord()).value()).get("aggregateType")).isEqualTo("UserCreated");
    }

    @Test
    public void canSetMultipleFieldsIntoTheEnvelope() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:payloadType,aggregateid:envelope:payloadId,type:header:payloadType");
        eventRouter.configure(hashMap);
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Struct struct = (Struct) apply.value();
        Assertions.assertThat(struct.get("payloadType")).isEqualTo("UserCreated");
        Assertions.assertThat(struct.get("payloadId")).isEqualTo("10711fa5");
        Assertions.assertThat(apply.headers().lastWithName("payloadType").value()).isEqualTo("UserCreated");
    }

    @Test(expected = ConnectException.class)
    public void shouldFailOnInvalidConfigurationForTopicRegex() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), " [[a-z]");
        eventRouter.configure(hashMap);
    }

    @Test(expected = ConnectException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFields() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type");
        eventRouter.configure(hashMap);
    }

    @Test(expected = ConnectException.class)
    public void shouldFailOnInvalidConfigurationForAdditionalFieldsEmpty() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "");
        eventRouter.configure(hashMap);
    }

    @Test(expected = ConnectException.class)
    public void shouldFailOnInvalidConfigurationForOperationBehavior() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), "invalidOption");
        eventRouter.configure(hashMap);
    }

    @Test
    @FixFor({"DBZ-2152"})
    public void canPassStringKey() {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord apply = eventRouter.apply(createEventRecord());
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(Schema.Type.STRING);
    }

    @Test
    @FixFor({"DBZ-2152"})
    public void canSetBinaryMessageKey() {
        byte[] bytes = "a UserCreated".getBytes(StandardCharsets.UTF_8);
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELD_EVENT_KEY.name(), "type");
        eventRouter.configure(hashMap);
        SourceRecord apply = eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.bytes(), bytes, SchemaBuilder.string(), "Some other payload id", "User", SchemaBuilder.string(), "{}", new HashMap(), new HashMap()));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(Schema.Type.BYTES);
        Assertions.assertThat(apply.key()).isEqualTo(bytes);
    }

    @Test
    @FixFor({"DBZ-2152"})
    public void canPassBinaryKey() {
        canPassKeyByType(SchemaBuilder.bytes(), "a binary key".getBytes(StandardCharsets.UTF_8));
    }

    @Test
    @FixFor({"DBZ-2152"})
    public void canPassIntKey() {
        canPassKeyByType(SchemaBuilder.int32(), 54321);
    }

    private void canPassKeyByType(SchemaBuilder schemaBuilder, Object obj) {
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord apply = eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.string(), "UserCreated", schemaBuilder, obj, "User", SchemaBuilder.string(), "{}", new HashMap(), new HashMap()));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(schemaBuilder.type());
        Assertions.assertThat(apply.key()).isEqualTo(obj);
    }

    @Test
    @FixFor({"DBZ-2152"})
    public void canPassBinaryMessage() {
        byte[] bytes = "a binary message".getBytes(StandardCharsets.UTF_8);
        EventRouter eventRouter = new EventRouter();
        eventRouter.configure(new HashMap());
        SourceRecord apply = eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", SchemaBuilder.string(), "UserCreated", SchemaBuilder.string(), "a key", "User", SchemaBuilder.bytes(), bytes, new HashMap(), new HashMap()));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.keySchema().type()).isEqualTo(Schema.Type.STRING);
        Assertions.assertThat(apply.key()).isEqualTo("a key");
        Assertions.assertThat(apply.valueSchema().type()).isEqualTo(Schema.Type.BYTES);
        Assertions.assertThat(apply.value()).isEqualTo(bytes);
    }

    @Test
    public void canMarkAnEventAsDeleted() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        hashMap.put(EventRouterConfigDefinition.ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD.name(), "true");
        eventRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("deleted", Schema.OPTIONAL_BOOLEAN_SCHEMA);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("is_deleted", true);
        Struct struct = (Struct) eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}", hashMap2, hashMap3)).value();
        Assertions.assertThat(struct).isNotNull();
        Assertions.assertThat(struct.get("deleted")).isEqualTo(true);
        SourceRecord apply = eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "", hashMap2, hashMap3));
        Assertions.assertThat((Struct) apply.value()).isNull();
        VerifyRecord.isValidTombstone(apply);
    }

    @Test
    public void noTombstoneIfNotConfigured() {
        EventRouter eventRouter = new EventRouter();
        HashMap hashMap = new HashMap();
        hashMap.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "is_deleted:envelope:deleted");
        eventRouter.configure(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("deleted", Schema.OPTIONAL_BOOLEAN_SCHEMA);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("is_deleted", true);
        Struct struct = (Struct) eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}", hashMap2, hashMap3)).value();
        Assertions.assertThat(struct).isNotNull();
        Assertions.assertThat(struct.get("deleted")).isEqualTo(true);
        SourceRecord apply = eventRouter.apply(createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "", hashMap2, hashMap3));
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(apply.key()).isNotNull();
        Assertions.assertThat(apply.keySchema()).isNotNull();
        Assertions.assertThat(struct2).isNotNull();
        Assertions.assertThat(struct2.get("deleted")).isEqualTo(true);
        Assertions.assertThat(apply.valueSchema()).isNotNull();
    }

    private SourceRecord createEventRecord() {
        return createEventRecord("da8d6de6-3b77-45ff-8f44-57db55a7a06c", "UserCreated", "10711fa5", "User", "{}");
    }

    private SourceRecord createEventRecord(String str, String str2, String str3, String str4, String str5) {
        return createEventRecord(str, str2, str3, str4, str5, new HashMap(), new HashMap());
    }

    private SourceRecord createEventRecord(String str, String str2, String str3, String str4, String str5, Map<String, Schema> map, Map<String, Object> map2) {
        return createEventRecord(str, SchemaBuilder.string(), str2, SchemaBuilder.string(), str3, str4, SchemaBuilder.string(), str5, map, map2);
    }

    private SourceRecord createEventRecord(String str, SchemaBuilder schemaBuilder, Object obj, SchemaBuilder schemaBuilder2, Object obj2, String str2, SchemaBuilder schemaBuilder3, Object obj3, Map<String, Schema> map, Map<String, Object> map2) {
        SchemaBuilder field = SchemaBuilder.struct().field("id", SchemaBuilder.string()).field("aggregatetype", SchemaBuilder.string()).field("aggregateid", schemaBuilder2).field("type", schemaBuilder).field("payload", schemaBuilder3).field("is_deleted", SchemaBuilder.bool().optional());
        field.getClass();
        map.forEach(field::field);
        Schema build = field.build();
        Envelope build2 = Envelope.defineSchema().withName("event.Envelope").withRecord(build).withSource(SchemaBuilder.struct().build()).build();
        Struct struct = new Struct(build);
        struct.put("id", str);
        struct.put("aggregatetype", str2);
        struct.put("aggregateid", obj2);
        struct.put("type", obj);
        struct.put("payload", obj3);
        struct.getClass();
        map2.forEach(struct::put);
        return new SourceRecord(new HashMap(), new HashMap(), "db.outbox", build2.schema(), build2.create(struct, (Struct) null, Instant.now()));
    }
}
