Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String
authorKai Moritz <kai@juplo.de>
Sat, 3 Sep 2022 12:24:07 +0000 (14:24 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 16:08:42 +0000 (18:08 +0200)
* Bisher waren alle Nachrichten vom Typ `String`.
* Jetzt verarbeitet der Adder zwei unterschiedliche Typen von Nachrichten.
* Die Nachrichten werden als JSON übertragen und mit Hilfe des
  `JsonDeserializer` von Spring Kafka in zwei unterschiedliche
  Spezialisierungen einer Basis-Klasse deserialisiert.
* Die für die Deserialisierung benötigte Typen-Information wird von dem
  Spring-Kafka-Tooling über den die `__TypeId__` transportiert.
* D.h., damit die Nachrichten korrekt deserialisiert werden können, ist es
  _nicht_ nötig, dass der Typ der Nachricht von Jackson aus der Nachricht
  selbst abgeleitet werden kann, sondern dass sich Sender und Empfänger
  darüber verständigen, welchen Hinweis sie in dem `__TypeId__`-Header
  hinterlegen.
* Die Verarbeitung der zwei Nachrichten-Typen wurde in Unter-Methoden
  ausgelagert, da dies die Vergleichbarkeit des Codes zur der Variante
  mit `@KafkaHandler` erhöht.

pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/Message.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MessageAddNumber.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MessageCalculateSum.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/MessageTest.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 6699408..43a63c7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -44,8 +44,8 @@
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
index 624a4ec..156b5a0 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
 
 import java.util.Optional;
 import java.util.Properties;
@@ -48,8 +49,8 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public EndlessConsumer<String, String> endlessConsumer(
-      KafkaConsumer<String, String> kafkaConsumer,
+  public EndlessConsumer<String, Message> endlessConsumer(
+      KafkaConsumer<String, Message> kafkaConsumer,
       ExecutorService executor,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
@@ -72,7 +73,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Message> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
 
@@ -84,7 +85,11 @@ public class ApplicationConfiguration
     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
+    props.put("value.deserializer", JsonDeserializer.class.getName());
+    props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
+    props.put(JsonDeserializer.TYPE_MAPPINGS,
+      Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," +
+      Message.Type.CALC + ":" + MessageCalculateSum.class.getName());
 
     return new KafkaConsumer<>(props);
   }
index df4e653..03a14c8 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, String> consumer;
+  private final EndlessConsumer<String, Message> consumer;
 
 
   @Override
index 51d524f..2829157 100644 (file)
@@ -12,7 +12,7 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Message>
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
@@ -21,22 +21,40 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
+  public void addNumber(
+      Integer partition,
+      String user,
+      MessageAddNumber message)
+  {
+    state.get(partition).addToSum(user, message.getNext());
+  }
+
+  public void calculateSum(
+      Integer partition,
+      String user,
+      MessageCalculateSum message)
+  {
+    AdderResult result = state.get(partition).calculate(user);
+    log.info("{} - New result for {}: {}", id, user, result);
+    results.addResults(partition, user, result);
+  }
+
   @Override
-  public void accept(ConsumerRecord<String, String> record)
+  public void accept(ConsumerRecord<String, Message> record)
   {
     Integer partition = record.partition();
     String user = record.key();
-    String message = record.value();
+    Message message = record.value();
 
-    if (message.equals("CALCULATE"))
-    {
-      AdderResult result = state.get(partition).calculate(user);
-      log.info("{} - New result for {}: {}", id, user, result);
-      results.addResults(partition, user, result);
-    }
-    else
+    switch(message.getType())
     {
-      state.get(partition).addToSum(user, Integer.parseInt(message));
+      case ADD:
+        addNumber(partition, user, (MessageAddNumber) message);
+        break;
+
+      case CALC:
+        calculateSum(partition, user, (MessageCalculateSum) message);
+        break;
     }
 
     if (throttle.isPresent())
diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java
new file mode 100644 (file)
index 0000000..e4999b7
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+
+public abstract class Message
+{
+  public enum Type {ADD, CALC}
+
+  public abstract Type getType();
+}
diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java
new file mode 100644 (file)
index 0000000..c024b65
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MessageAddNumber extends Message
+{
+  private Integer next;
+
+
+  @Override
+  public Type getType()
+  {
+    return Type.ADD;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java
new file mode 100644 (file)
index 0000000..afc5a39
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MessageCalculateSum extends Message
+{
+  @Override
+  public Type getType()
+  {
+    return Type.CALC;
+  }
+}
index 6a037eb..bd9f449 100644 (file)
@@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 
 @Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, String>
+public class ApplicationTests extends GenericApplicationTests<String, Message>
 {
   @Autowired
   StateRepository stateRepository;
@@ -39,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             .mapToObj(i -> "seeräuber-" + i)
             .toArray(i -> new String[i]);
     final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
 
     int counter = 0;
 
@@ -72,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
 
           if (message[i] > number[i])
           {
-            send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+            send(
+              key,
+              calculateMessage,
+              Message.Type.CALC,
+              poisonPill(poisonPills, pass, counter),
+              logicError(logicErrors, pass, counter),
+              messageSender);
             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
             // Pick next number to calculate
             number[i] = numbers[next++%numbers.length];
@@ -80,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
           }
 
-          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
-          send(key, value, fail(logicErrors, pass, counter), messageSender);
+          send(
+            key,
+            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
+            Message.Type.ADD,
+            poisonPill(poisonPills, pass, counter),
+            logicError(logicErrors, pass, counter),
+            messageSender);
         }
       }
 
       return counter;
     }
 
-    boolean fail (boolean logicErrors, int pass, int counter)
+    boolean poisonPill (boolean poisonPills, int pass, int counter)
+    {
+      return poisonPills && pass > 300 && counter%99 == 0;
+    }
+
+    boolean logicError(boolean logicErrors, int pass, int counter)
     {
       return logicErrors && pass > 300 && counter%77 == 0;
     }
@@ -96,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
     void send(
         Bytes key,
         Bytes value,
-        boolean fail,
+        Message.Type type,
+        boolean poisonPill,
+        boolean logicError,
         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
     {
       counter++;
 
-      if (fail)
+      if (logicError)
       {
-        value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+      }
+      if (poisonPill)
+      {
+        value = new Bytes("BOOM!".getBytes());
       }
 
-      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-    }
-
-    @Override
-    public boolean canGeneratePoisonPill()
-    {
-      return false;
+      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
+      record.headers().add("__TypeId__", type.toString().getBytes());
+      messageSender.accept(record);
     }
 
     @Override
diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java
new file mode 100644 (file)
index 0000000..52794ba
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class MessageTest
+{
+  ObjectMapper mapper = new ObjectMapper();
+
+  @Test
+  @DisplayName("Deserialize a MessageAddNumber message")
+  public void testDeserializeMessageAddNumber()
+  {
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class));
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class));
+  }
+
+  @Test
+  @DisplayName("Deserialize a MessageCalculateSum message")
+  public void testDeserializeMessageCalculateSum() throws JsonProcessingException
+  {
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class));
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class));
+  }
+}