From: Kai Moritz Date: Sat, 3 Sep 2022 12:24:07 +0000 (+0200) Subject: Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4e07958ca471036693e944b82b10567705ac55ed;p=demos%2Fkafka%2Ftraining Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String * Bisher waren alle Nachrichten vom Typ `String`. * Jetzt verarbeitet der Adder zwei unterschiedliche Typen von Nachrichten. * Die Nachrichten werden als JSON übertragen und mit Hilfe des `JsonDeserializer` von Spring Kafka in zwei unterschiedliche Spezialisierungen einer Basis-Klasse deserialisiert. * Die für die Deserialisierung benötigte Typen-Information wird von dem Spring-Kafka-Tooling über den die `__TypeId__` transportiert. * D.h., damit die Nachrichten korrekt deserialisiert werden können, ist es _nicht_ nötig, dass der Typ der Nachricht von Jackson aus der Nachricht selbst abgeleitet werden kann, sondern dass sich Sender und Empfänger darüber verständigen, welchen Hinweis sie in dem `__TypeId__`-Header hinterlegen. --- diff --git a/pom.xml b/pom.xml index 6699408..43a63c7 100644 --- a/pom.xml +++ b/pom.xml @@ -44,8 +44,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e4ac1ab..596be26 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -6,6 +6,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.time.Clock; import java.util.Optional; @@ -40,7 +41,7 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - Consumer consumer, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( @@ -55,8 +56,8 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -79,7 +80,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -91,7 +92,11 @@ public class ApplicationConfiguration props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); + props.put(JsonDeserializer.TYPE_MAPPINGS, + Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," + + Message.Type.CALC + ":" + MessageCalculateSum.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..03a14c8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..829ab0e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -12,7 +12,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -22,21 +22,24 @@ public class ApplicationRecordHandler implements RecordHandler @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); String user = record.key(); - String message = record.value(); + Message message = record.value(); - if (message.equals("CALCULATE")) + switch(message.getType()) { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - else - { - state.get(partition).addToSum(user, Integer.parseInt(message)); + case ADD: + MessageAddNumber addNumber = (MessageAddNumber)message; + state.get(partition).addToSum(user, addNumber.getNext()); + break; + + case CALC: + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + break; } if (throttle.isPresent()) diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6a037eb..bd9f449 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j -public class ApplicationTests extends GenericApplicationTests +public class ApplicationTests extends GenericApplicationTests { @Autowired StateRepository stateRepository; @@ -39,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests .mapToObj(i -> "seeräuber-" + i) .toArray(i -> new String[i]); final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); + final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); int counter = 0; @@ -72,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests if (message[i] > number[i]) { - send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); + send( + key, + calculateMessage, + Message.Type.CALC, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate number[i] = numbers[next++%numbers.length]; @@ -80,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); } - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++))); - send(key, value, fail(logicErrors, pass, counter), messageSender); + send( + key, + new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), + Message.Type.ADD, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); } } return counter; } - boolean fail (boolean logicErrors, int pass, int counter) + boolean poisonPill (boolean poisonPills, int pass, int counter) + { + return poisonPills && pass > 300 && counter%99 == 0; + } + + boolean logicError(boolean logicErrors, int pass, int counter) { return logicErrors && pass > 300 && counter%77 == 0; } @@ -96,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests void send( Bytes key, Bytes value, - boolean fail, + Message.Type type, + boolean poisonPill, + boolean logicError, Consumer> messageSender) { counter++; - if (fail) + if (logicError) { - value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); + value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); + } + if (poisonPill) + { + value = new Bytes("BOOM!".getBytes()); } - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } - - @Override - public boolean canGeneratePoisonPill() - { - return false; + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", type.toString().getBytes()); + messageSender.accept(record); } @Override diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..52794ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.*; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +}