From: Kai Moritz Date: Mon, 28 Oct 2024 13:28:57 +0000 (+0100) Subject: `ExampleProducer` in eine Spring-Boot App umgebaut (ohne Spring Kafka) X-Git-Tag: producer/spring-producer--2025-03-18--19-42 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=7ba5509faa3faffc88daac831d7f2f03221ffb16;p=demos%2Fkafka%2Ftraining `ExampleProducer` in eine Spring-Boot App umgebaut (ohne Spring Kafka) * Validierung der Properties aktiviert * Steuerung und Abfrage über die Actuator-REST-API von ermöglicht * Namespace für Konfig von `producer` in `juplo.producer` geändert * In der Configuration wird das Interface des `KafkaProducer` übergeben * Producerspezifische Properties werden in eigener nested Class verwaltet ** Dadurch wird der Code übersichtlicher, wenn spätere Implementierungen _sowohl_ als Consumer, _als auch_ als Producer agieren! * Das Throttling kann über `juplo.producer.throttle-ms` gesteuert werden * `delivery.timeout.ms` und `request.timeout.ms` noch weiter gesenkt * `max.block.ms` auf 5s gesetzt ** Sonst entsteht bei verschiedenen Fehlern schnell ein falscher Eindruck ** Man könnte sonst auf die Idee kommen, dass der Producer für diese beliebig lange wartet! * Eine Exception im Producer löst das Beenden der App aus * Fix: `close()` muss noch vom `ExampleProducer` aufgerufen werden ** Der Aufruf von `close()` löst das Versenden wartender Nachrichten aus. ** Wenn die Methode erst von Spring aufgerufen wird, werden ggf. noch Nachrichten konsumiert, nachdem der `ExampleProducer` bereits ausgegeben hat, wieviele Nachrichten er insgesamt verarbeitet hat. * `delivery.timeout.ms` konfigurierbar gemacht * `max.block.ms` konfigurierbar gemacht * `buffer.memory` konfigurierbar gemacht --- diff --git a/.dockerignore b/.dockerignore index 9127d15f..6de8137a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,4 +1,3 @@ * !Dockerfile !target/*.jar -!target/libs/*.jar diff --git a/.maven-dockerinclude b/.maven-dockerinclude index a00c65ff..fd6cecd2 100644 --- a/.maven-dockerinclude +++ b/.maven-dockerinclude @@ -1,2 +1 @@ target/*.jar -target/libs/*.jar diff --git a/Dockerfile b/Dockerfile index 74e66edf..9e196ff0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM eclipse-temurin:21-jre VOLUME /tmp COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "kafka:9092", "test", "DCKR" ] +CMD [] diff --git a/README.sh b/README.sh index 3d98ace7..c8a0b221 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/simple-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -27,10 +27,16 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer -sleep 5 - -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' +docker compose -f docker/docker-compose.yml up -d peter ute +sleep 15 docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml logs producer + +echo +echo "Von peter empfangen:" +docker compose -f docker/docker-compose.yml logs peter | grep '\ test\/.' +echo +echo "Von ute empfangen:" +docker compose -f docker/docker-compose.yml logs ute | grep '\ test\/.' + +docker compose -f docker/docker-compose.yml stop peter ute diff --git a/build.gradle b/build.gradle index be0bc470..1429c4dd 100644 --- a/build.gradle +++ b/build.gradle @@ -28,9 +28,23 @@ repositories { dependencies { implementation 'org.apache.kafka:kafka-clients' - implementation 'ch.qos.logback:logback-classic' + implementation 'org.springframework.boot:spring-boot-starter-actuator' + implementation 'org.springframework.boot:spring-boot-starter-validation' + implementation 'org.springframework.boot:spring-boot-starter-web' compileOnly 'org.projectlombok:lombok' + developmentOnly 'org.springframework.boot:spring-boot-devtools' + annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.kafka:spring-kafka-test' + testCompileOnly 'org.projectlombok:lombok' + testAnnotationProcessor 'org.projectlombok:lombok' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +tasks.named('test') { + useJUnitPlatform() } docker { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5b19de74..00f68fcc 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,8 +136,23 @@ services: - kafka-3 producer: - image: juplo/simple-producer:1.0-SNAPSHOT - command: kafka:9092 test producer + image: juplo/spring-producer:1.0-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: producer + juplo.producer.topic: test + + consumer: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer + + peter: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group peter + + ute: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group ute volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index 3da4d59e..f64266b4 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - simple-producer - Super Simple Producer - A Simple Producer, programmed with pure Java, that sends messages via Kafka + spring-producer + Spring Producer + A Simple Producer, based on Spring Boot, that sends messages via Kafka 1.0-SNAPSHOT @@ -22,6 +22,23 @@ + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-starter-validation + org.apache.kafka kafka-clients @@ -32,42 +49,40 @@ compile - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test - org.apache.maven.plugins - maven-dependency-plugin + org.springframework.boot + spring-boot-maven-plugin - copy-dependencies - package - copy-dependencies + build-info - - ${project.build.directory}/libs - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.ExampleProducer - - - - pl.project13.maven git-commit-id-plugin diff --git a/settings.gradle b/settings.gradle index 5f64b227..5f580c98 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'simple-producer' +rootProject.name = 'spring-producer' diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 00000000..0069257f --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class Application +{ + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 00000000..0090ceea --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,56 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.time.Duration; +import java.util.Properties; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public ExampleProducer exampleProducer( + ApplicationProperties properties, + Producer kafkaProducer, + ConfigurableApplicationContext applicationContext) + { + return + new ExampleProducer( + properties.getClientId(), + properties.getProducerProperties().getTopic(), + properties.getProducerProperties().getThrottle() == null + ? Duration.ofMillis(500) + : properties.getProducerProperties().getThrottle(), + kafkaProducer, + () -> applicationContext.close()); + } + + @Bean(destroyMethod = "") + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getProducerProperties().getAcks()); + props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); + props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); + props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); + props.put("batch.size", properties.getProducerProperties().getBatchSize()); + props.put("metadata.max.age.ms", 5000); // 5 Sekunden + props.put("request.timeout.ms", 5000); // 5 Sekunden + props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); + props.put("compression.type", properties.getProducerProperties().getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 00000000..43232628 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,62 @@ +package de.juplo.kafka; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; + + +@ConfigurationProperties(prefix = "juplo") +@Validated +@Getter +@Setter +public class ApplicationProperties +{ + @NotNull + @NotEmpty + private String bootstrapServer; + @NotNull + @NotEmpty + private String clientId; + + @NotNull + private ProducerProperties producer; + + + public ProducerProperties getProducerProperties() + { + return producer; + } + + + @Validated + @Getter + @Setter + static class ProducerProperties + { + @NotNull + @NotEmpty + private String topic; + @NotNull + @NotEmpty + private String acks; + @NotNull + private Duration deliveryTimeout; + @NotNull + private Duration maxBlock; + @NotNull + private Long bufferMemory; + @NotNull + private Integer batchSize; + @NotNull + private Duration linger; + @NotNull + @NotEmpty + private String compressionType; + private Duration throttle; + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index b7e9bc78..c5a5a80d 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -2,40 +2,45 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import java.util.Properties; +import java.time.Duration; @Slf4j -public class ExampleProducer +public class ExampleProducer implements Runnable { private final String id; private final String topic; + private final Duration throttle; private final Producer producer; + private final Thread workerThread; + private final Runnable closeCallback; private volatile boolean running = true; - private volatile boolean done = false; private long produced = 0; + public ExampleProducer( - String broker, + String id, String topic, - String clientId) + Duration throttle, + Producer producer, + Runnable closeCallback) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.id = clientId; + this.id = id; this.topic = topic; - producer = new KafkaProducer<>(props); + this.throttle = throttle; + this.producer = producer; + + workerThread = new Thread(this, "ExampleProducer Worker-Thread"); + workerThread.start(); + + this.closeCallback = closeCallback; } + + @Override public void run() { long i = 0; @@ -45,19 +50,31 @@ public class ExampleProducer for (; running; i++) { send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); + + if (throttle.isPositive()) + { + try + { + Thread.sleep(throttle); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } } } catch (Exception e) { log.error("{} - Unexpected error!", id, e); + log.info("{} - Triggering exit of application!", id); + new Thread(closeCallback).start(); } finally { log.info("{}: Closing the KafkaProducer", id); producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; } } @@ -114,44 +131,10 @@ public class ExampleProducer } - public static void main(String[] args) throws Exception + public void shutdown() throws InterruptedException { - String broker = ":9092"; - String topic = "test"; - String clientId = "DEV"; - - switch (args.length) - { - case 3: - clientId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - ExampleProducer instance = new ExampleProducer(broker, topic, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.running = false; - while (!instance.done) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - broker, - topic, - clientId); - instance.run(); + log.info("{} joining the worker-thread...", id); + running = false; + workerThread.join(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 00000000..98ea1284 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,46 @@ +juplo: + bootstrap-server: :9092 + client-id: DEV + producer: + topic: test + acks: -1 + delivery-timeout: 10s + max-block: 5s + buffer-memory: 33554432 + batch-size: 16384 + linger: 0 + compression-type: gzip + throttle: 500 +management: + endpoint: + shutdown: + enabled: true + endpoints: + web: + exposure: + include: "*" + info: + env: + enabled: true + java: + enabled: true +info: + kafka: + bootstrap-server: ${juplo.bootstrap-server} + client-id: ${juplo.client-id} + producer: + topic: ${juplo.producer.topic} + acks: ${juplo.producer.acks} + delivery-timeout: ${juplo.producer.delivery-timeout} + max-block: ${juplo.producer.max-block} + buffer-memory: ${juplo.producer.buffer-memory} + batch-size: ${juplo.producer.batch-size} + linger: ${juplo.producer.linger} + compression-type: ${juplo.producer.compression-type} + throttle: ${juplo.producer.throttle} +logging: + level: + root: INFO + de.juplo: TRACE +server: + port: 8880 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 7a25e76f..9c7af767 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -7,8 +7,6 @@ - - diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 00000000..29ca80a7 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,96 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.auto-offset-reset=earliest", + "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.producer.topic=" + TOPIC}) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@Slf4j +public class ApplicationTests +{ + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + + @Test + public void testApplicationStartup() + { + await("Application is healthy") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> mockMvc + .perform(get("/actuator/health")) + .andExpect(status().isOk()) + .andExpect(jsonPath("status").value("UP"))); + } + + @Test + public void testSendMessage() throws Exception + { + await("Some messages were send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() >= 1); + } + + + static class Consumer + { + final List> received = new LinkedList<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC) + public void receive(ConsumerRecord record) + { + log.debug("Received message: {}", record); + received.add(record); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +}