From c4cc9637d8d1d42bff9588dabb5b1278037d60cf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 29 Oct 2024 16:48:54 +0100 Subject: [PATCH] Der Producer schreibt gezielt in eine fixe (konfigurierbare) Partition --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 3 ++- pom.xml | 2 +- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + src/main/java/de/juplo/kafka/ApplicationProperties.java | 2 ++ src/main/java/de/juplo/kafka/ExampleProducer.java | 4 ++++ src/main/resources/application.yml | 2 ++ 8 files changed, 14 insertions(+), 4 deletions(-) diff --git a/README.sh b/README.sh index c8a0b221..ab112868 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-fixedsharding-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 1429c4dd..d60423c5 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' +version = '1.0-fixedsharding-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fcc..c5b1b5ad 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,11 +136,12 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-fixedsharding-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test + juplo.producer.partition: 0 consumer: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index f64266b4..7a47d40f 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-SNAPSHOT + 1.0-fixedsharding-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090ceea..ab489256 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -26,6 +26,7 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getPartition(), properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 43232628..65b71f3a 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -42,6 +42,8 @@ public class ApplicationProperties @NotEmpty private String topic; @NotNull + private Integer partition; + @NotNull @NotEmpty private String acks; @NotNull diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 25e885d9..dabee31a 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -12,6 +12,7 @@ public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final int partition; private final Duration throttle; private final Producer producer; private final Thread workerThread; @@ -24,12 +25,14 @@ public class ExampleProducer implements Runnable public ExampleProducer( String id, String topic, + int partition, Duration throttle, Producer producer, Runnable closeCallback) { this.id = id; this.topic = topic; + this.partition = partition; this.throttle = throttle; this.producer = producer; @@ -84,6 +87,7 @@ public class ExampleProducer implements Runnable final ProducerRecord record = new ProducerRecord<>( topic, // Topic + partition, // Partition key, // Key value // Value ); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..0f070dd9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,6 +3,7 @@ juplo: client-id: DEV producer: topic: test + partition: 0 acks: -1 delivery-timeout: 10s max-block: 5s @@ -30,6 +31,7 @@ info: client-id: ${juplo.client-id} producer: topic: ${juplo.producer.topic} + partition: ${juplo.producer.partition} acks: ${juplo.producer.acks} delivery-timeout: ${juplo.producer.delivery-timeout} max-block: ${juplo.producer.max-block} -- 2.20.1