From bbc1b7a41d1a8b1751264610fd358debef2ab7c2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 29 Oct 2024 16:48:54 +0100 Subject: [PATCH] Der Producer schreibt gezielt in eine fixe (konfigurierbare) Partition --- README.sh | 2 +- docker/docker-compose.yml | 3 ++- pom.xml | 2 +- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/ApplicationProperties.java | 2 ++ src/main/java/de/juplo/kafka/ExampleProducer.java | 4 ++++ src/main/resources/application.yml | 2 ++ 7 files changed, 13 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index 499780a..668e756 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-fixedsharding-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c417a7f..d482e48 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,11 +190,12 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-fixedsharding-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test + juplo.producer.partition: 0 consumer-1: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 841299b..c3f745f 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs - 1.0-SNAPSHOT + 1.0-fixedsharding-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 2be61da..4d45c04 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,6 +23,7 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getPartition(), properties.getProducerProperties().getThrottleMs() == null ? 500 : properties.getProducerProperties().getThrottleMs(), diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 5cb9aa0..8eebd09 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -40,6 +40,8 @@ public class ApplicationProperties @NotEmpty private String topic; @NotNull + private Integer partition; + @NotNull @NotEmpty private String acks; @NotNull diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 6f7c093..2d043c2 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -10,6 +10,7 @@ public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final int partition; private final int throttleMs; private final Producer producer; private final Thread workerThread; @@ -21,11 +22,13 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + int partition, int throttleMs, Producer producer) { this.id = id; this.topic = topic; + this.partition = partition; this.throttleMs = throttleMs; this.producer = producer; @@ -74,6 +77,7 @@ public class ExampleProducer implements Runnable final ProducerRecord record = new ProducerRecord<>( topic, // Topic + partition, // Partition key, // Key value // Value ); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 85aee9d..79c5c98 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,6 +3,7 @@ juplo: client-id: DEV producer: topic: test + partition: 0 acks: -1 batch-size: 16384 linger-ms: 0 @@ -27,6 +28,7 @@ info: client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} + partition: ${juplo.producer.partition} acks: ${juplo.producer.acks} batch-size: ${juplo.producer.batch-size} linger-ms: ${juplo.producer.linger-ms} -- 2.20.1