From: Kai Moritz Date: Tue, 26 Jul 2022 09:38:54 +0000 (+0200) Subject: Deserialisierung von Nachrichten unterschiedlichen Typs X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7^2~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a8aed63e92d58731176dde8b7cec4f5a022ac813;p=demos%2Fkafka%2Ftraining Deserialisierung von Nachrichten unterschiedlichen Typs --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 431041c..ddeb9d5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -20,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -29,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, ApplicationProperties properties) { return @@ -51,7 +50,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -62,7 +61,9 @@ public class ApplicationConfiguration props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put(JsonDeserializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName()); + props.put(JsonDeserializer.TYPE_MAPPINGS, + "message:" + ClientMessage.class.getName() + "," + + "greeting:" + Greeting.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); return new KafkaConsumer<>(props); diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java index d18800b..a158907 100644 --- a/src/main/java/de/juplo/kafka/ClientMessage.java +++ b/src/main/java/de/juplo/kafka/ClientMessage.java @@ -1,11 +1,22 @@ package de.juplo.kafka; -import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; -@Data -public class ClientMessage +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class ClientMessage extends ValidMessage { String client; String message; + + ClientMessage() + { + super(Type.CLIENT_MESSAGE); + } } diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java new file mode 100644 index 0000000..4421a50 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Greeting.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class Greeting extends ValidMessage +{ + String name; + LocalDateTime when; + + public Greeting() + { + super(Type.GREETING); + } +} diff --git a/src/main/java/de/juplo/kafka/ValidMessage.java b/src/main/java/de/juplo/kafka/ValidMessage.java new file mode 100644 index 0000000..217d8f3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ValidMessage.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public abstract class ValidMessage +{ + enum Type { CLIENT_MESSAGE, GREETING } + + @Getter + private final Type type; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 62906b3..9169de0 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -21,11 +21,11 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -57,7 +57,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -65,11 +65,11 @@ class ApplicationTests @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -80,8 +80,20 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = serialize(key, counter); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key, counter); + type = "greeting"; + } + + return toRecord(partition, key, value, type); }); await("100 records received") @@ -107,10 +119,28 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : serialize(key, counter); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter == 77) + { + value = serializeFooMessage(key, counter); + type = "foo"; + } + else + { + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key, counter); + type = "greeting"; + } + } + + return toRecord(partition, key, value, type); }); await("Consumer failed") @@ -209,7 +239,6 @@ class ApplicationTests ProducerRecord record = recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); - record.headers().add("__TypeId__", "message".getBytes()); kafkaProducer.send(record, (metadata, e) -> { if (metadata != null) @@ -234,14 +263,31 @@ class ApplicationTests } } - Bytes serialize(String key, Long value) + ProducerRecord toRecord(int partition, String key, Bytes value, String type) + { + ProducerRecord record = + new ProducerRecord<>(TOPIC, partition, key, value); + record.headers().add("__TypeId__", type.getBytes()); + return record; + } + + Bytes serializeClientMessage(String key, Long value) { - ClientMessage message = new ClientMessage(); - message.setClient(key); - message.setMessage(value.toString()); + TestClientMessage message = new TestClientMessage(key, value.toString()); return new Bytes(valueSerializer.serialize(TOPIC, message)); } + Bytes serializeGreeting(String key, Long value) + { + TestGreeting message = new TestGreeting(key, LocalDateTime.now()); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + + Bytes serializeFooMessage(String key, Long value) + { + TestFooMessage message = new TestFooMessage(key, value); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } @BeforeEach public void init() @@ -258,7 +304,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -298,7 +344,7 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer serializer() { return new JsonSerializer<>(); } diff --git a/src/test/java/de/juplo/kafka/TestClientMessage.java b/src/test/java/de/juplo/kafka/TestClientMessage.java new file mode 100644 index 0000000..0072121 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestClientMessage.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class TestClientMessage +{ + private final String client; + private final String message; +} diff --git a/src/test/java/de/juplo/kafka/TestFooMessage.java b/src/test/java/de/juplo/kafka/TestFooMessage.java new file mode 100644 index 0000000..d8f4c65 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestFooMessage.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class TestFooMessage +{ + private final String client; + private final Long timestamp; +} diff --git a/src/test/java/de/juplo/kafka/TestGreeting.java b/src/test/java/de/juplo/kafka/TestGreeting.java new file mode 100644 index 0000000..446e877 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestGreeting.java @@ -0,0 +1,13 @@ +package de.juplo.kafka; + +import lombok.Value; + +import java.time.LocalDateTime; + + +@Value +public class TestGreeting +{ + private final String name; + private final LocalDateTime when; +}