]> juplo.de Git - demos/kafka/training/commitdiff
Parameter `partition` wiederbelebt sumup-gateway---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Wed, 17 Aug 2022 16:43:43 +0000 (18:43 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Aug 2022 16:43:45 +0000 (18:43 +0200)
* Wenn der Parameter gesetzt ist, schreibt das Gateway alle Nachrichten
  in die vorgegebene Partition.
* Wenn der Parameter null ist, wird die Default-Partitionierung
  (Hashing by Key) verwendet.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/RestGateway.java

index 33dabc93f8c1972662157b0f83a7b5827c969094..d9c7661eba0c2b510b43034c983b31e7b1e2fc96 100644 (file)
@@ -23,6 +23,7 @@ public class ApplicationConfiguration
         new RestGateway(
             properties.getClientId(),
             properties.getTopic(),
+            properties.getPartition(),
             kafkaProducer);
   }
 
index a18b20fcb6fbcfbf6749c130a714faec7faf0074..2bcbb7abd950b97137460dd06a72457ea5891891 100644 (file)
@@ -22,6 +22,7 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String topic;
+  private Integer partition;
   @NotNull
   @NotEmpty
   private String acks;
index 4549b8fb0195dd09beced3b87749c9e9f342872f..c50f465480beaf6af4e67244ae4a615a71a3560c 100644 (file)
@@ -17,6 +17,7 @@ public class RestGateway
 {
   private final String id;
   private final String topic;
+  private final Integer partition;
   private final KafkaProducer<String, Integer> producer;
 
   private long produced = 0;
@@ -33,6 +34,7 @@ public class RestGateway
 
     final ProducerRecord<String, Integer> record = new ProducerRecord<>(
         topic,  // Topic
+        partition, // Partition - Uses default-algorithm, if null
         key,    // Key
         value   // Value
     );