From: Kai Moritz Date: Tue, 26 Jul 2022 14:11:45 +0000 (+0200) Subject: Verbesserungen des Testfalls gemerged (Branch 'deserialization') X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7^2~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=c032639acf2861b9039dc08e98bb7d9d1f59b086;hp=be1b513f8bd7646f9ceb3a7ba90952641e3af125;p=demos%2Fkafka%2Ftraining Verbesserungen des Testfalls gemerged (Branch 'deserialization') --- diff --git a/pom.xml b/pom.xml index 1f5caab..0889d23 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.projectlombok lombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 766740b..9fc0c70 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,11 +2,11 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Properties; import java.util.concurrent.ExecutorService; @@ -19,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -28,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, ApplicationProperties properties) { return @@ -50,7 +50,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -61,7 +61,11 @@ public class ApplicationConfiguration props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put(JsonDeserializer.TYPE_MAPPINGS, + "message:" + ClientMessage.class.getName() + "," + + "greeting:" + Greeting.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java new file mode 100644 index 0000000..a158907 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ClientMessage.java @@ -0,0 +1,22 @@ +package de.juplo.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class ClientMessage extends ValidMessage +{ + String client; + String message; + + ClientMessage() + { + super(Type.CLIENT_MESSAGE); + } +} diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java new file mode 100644 index 0000000..4421a50 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Greeting.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class Greeting extends ValidMessage +{ + String name; + LocalDateTime when; + + public Greeting() + { + super(Type.GREETING); + } +} diff --git a/src/main/java/de/juplo/kafka/ValidMessage.java b/src/main/java/de/juplo/kafka/ValidMessage.java new file mode 100644 index 0000000..217d8f3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ValidMessage.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public abstract class ValidMessage +{ + enum Type { CLIENT_MESSAGE, GREETING } + + @Getter + private final Type type; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3bac537..24d3a9e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,11 +15,13 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -56,7 +58,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -64,11 +66,11 @@ class ApplicationTests @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -78,8 +80,20 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key, counter); + type = "greeting"; + } + + return toRecord(partition, key, value, type); }); await("100 records received") @@ -106,10 +120,28 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter == 77) + { + value = serializeFooMessage(key, counter); + type = "foo"; + } + else + { + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key, counter); + type = "greeting"; + } + } + + return toRecord(partition, key, value, type); }); await("Consumer failed") @@ -249,6 +281,31 @@ class ApplicationTests } } + ProducerRecord toRecord(int partition, String key, Bytes value, String type) + { + ProducerRecord record = + new ProducerRecord<>(TOPIC, partition, key, value); + record.headers().add("__TypeId__", type.getBytes()); + return record; + } + + Bytes serializeClientMessage(String key, Long value) + { + TestClientMessage message = new TestClientMessage(key, value.toString()); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + + Bytes serializeGreeting(String key, Long value) + { + TestGreeting message = new TestGreeting(key, LocalDateTime.now()); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + + Bytes serializeFooMessage(String key, Long value) + { + TestFooMessage message = new TestFooMessage(key, value); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } @BeforeEach public void init() @@ -267,7 +324,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -307,9 +364,9 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer serializer() { - return new LongSerializer(); + return new JsonSerializer<>(); } @Bean diff --git a/src/test/java/de/juplo/kafka/TestClientMessage.java b/src/test/java/de/juplo/kafka/TestClientMessage.java new file mode 100644 index 0000000..0072121 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestClientMessage.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class TestClientMessage +{ + private final String client; + private final String message; +} diff --git a/src/test/java/de/juplo/kafka/TestFooMessage.java b/src/test/java/de/juplo/kafka/TestFooMessage.java new file mode 100644 index 0000000..d8f4c65 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestFooMessage.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class TestFooMessage +{ + private final String client; + private final Long timestamp; +} diff --git a/src/test/java/de/juplo/kafka/TestGreeting.java b/src/test/java/de/juplo/kafka/TestGreeting.java new file mode 100644 index 0000000..446e877 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestGreeting.java @@ -0,0 +1,13 @@ +package de.juplo.kafka; + +import lombok.Value; + +import java.time.LocalDateTime; + + +@Value +public class TestGreeting +{ + private final String name; + private final LocalDateTime when; +}