From 9af7379cd551706dcff3dce70f313ea6b24f5d3d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 29 Oct 2024 16:48:54 +0100 Subject: [PATCH] Der Producer schreibt gezielt in eine fixe (konfigurierbare) Partition --- README.sh | 2 +- docker/docker-compose.yml | 3 ++- pom.xml | 2 +- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/ApplicationProperties.java | 2 ++ src/main/java/de/juplo/kafka/ExampleProducer.java | 4 ++++ src/main/resources/application.yml | 2 ++ 7 files changed, 13 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index 499780a..668e756 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-fixedsharding-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c417a7f..d482e48 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,11 +190,12 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-fixedsharding-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test + juplo.producer.partition: 0 consumer-1: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 841299b..c3f745f 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs - 1.0-SNAPSHOT + 1.0-fixedsharding-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7540dd3..4d1afe8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,6 +26,7 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getPartition(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 4323262..65b71f3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -42,6 +42,8 @@ public class ApplicationProperties @NotEmpty private String topic; @NotNull + private Integer partition; + @NotNull @NotEmpty private String acks; @NotNull diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index bc5cf89..e2811f0 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,6 +12,7 @@ public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final int partition; private final Duration throttle; private final Producer producer; private final Thread workerThread; @@ -24,12 +25,14 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + int partition, Duration throttle, Producer producer, Runnable closeCallback) { this.id = id; this.topic = topic; + this.partition = partition; this.throttle = throttle; this.producer = producer; @@ -84,6 +87,7 @@ public class ExampleProducer implements Runnable final ProducerRecord record = new ProducerRecord<>( topic, // Topic + partition, // Partition key, // Key value // Value ); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea128..0f070dd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,6 +3,7 @@ juplo: client-id: DEV producer: topic: test + partition: 0 acks: -1 delivery-timeout: 10s max-block: 5s @@ -30,6 +31,7 @@ info: client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} + partition: ${juplo.producer.partition} acks: ${juplo.producer.acks} delivery-timeout: ${juplo.producer.delivery-timeout} max-block: ${juplo.producer.max-block} -- 2.20.1