From: Kai Moritz Date: Sat, 30 Apr 2022 09:38:26 +0000 (+0200) Subject: Merge branch 'deserialization' into springified-consumer--serialization X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7^2~5 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d2f482e5eb951e725f4193f97d979330e91c930e;hp=c2ed5c937b94f61b40ee9e329bf80b0a62fb4fbc;p=demos%2Fkafka%2Ftraining Merge branch 'deserialization' into springified-consumer--serialization --- diff --git a/pom.xml b/pom.xml index f218085..21466ec 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.projectlombok lombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..431041c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -19,7 +20,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -28,10 +29,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, ApplicationProperties properties) { return @@ -50,7 +51,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -60,7 +61,9 @@ public class ApplicationConfiguration props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put(JsonDeserializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java new file mode 100644 index 0000000..d18800b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ClientMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class ClientMessage +{ + String client; + String message; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 40dc149..6c25bcd 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,6 +15,7 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -24,8 +25,8 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -56,7 +57,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -64,11 +65,11 @@ class ApplicationTests @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -77,7 +78,7 @@ class ApplicationTests @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((key, counter) -> serialize(key, counter)); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -100,10 +101,10 @@ class ApplicationTests @Order(2) void commitsOffsetOfErrorForReprocessingOnError() { - send100Messages(counter -> + send100Messages((key, counter) -> counter == 77 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); + : serialize(key, counter)); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -185,7 +186,7 @@ class ApplicationTests } - void send100Messages(Function messageGenerator) + void send100Messages(BiFunction messageGenerator) { long i = 0; @@ -193,7 +194,7 @@ class ApplicationTests { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); + Bytes value = messageGenerator.apply(key, ++i); ProducerRecord record = new ProducerRecord<>( @@ -202,6 +203,7 @@ class ApplicationTests Integer.toString(key%2), value); + record.headers().add("__TypeId__", "message".getBytes()); kafkaProducer.send(record, (metadata, e) -> { if (metadata != null) @@ -226,6 +228,14 @@ class ApplicationTests } } + Bytes serialize(Integer key, Long value) + { + ClientMessage message = new ClientMessage(); + message.setClient(key.toString()); + message.setMessage(value.toString()); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + @BeforeEach public void init() @@ -242,7 +252,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -282,9 +292,9 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer serializer() { - return new LongSerializer(); + return new JsonSerializer<>(); } @Bean