From 7e0114aa23f42743a953742a2c2c810f9efef915 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Nov 2024 12:36:21 +0100 Subject: [PATCH] =?utf8?q?Umbau=20zu=20`supersimple-producer`=20aus=20alte?= =?utf8?q?m=20Branch=20=C3=BCbernommen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 +- build.gradle | 7 +- docker/docker-compose.yml | 8 +- pom.xml | 30 +--- settings.gradle | 2 +- src/main/java/de/juplo/kafka/Application.java | 14 -- .../juplo/kafka/ApplicationConfiguration.java | 56 ------ .../de/juplo/kafka/ApplicationProperties.java | 62 ------- .../java/de/juplo/kafka/ExampleProducer.java | 161 +++++------------- src/main/resources/application.yml | 49 +----- .../java/de/juplo/kafka/ApplicationTests.java | 80 +-------- 11 files changed, 61 insertions(+), 410 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/Application.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java diff --git a/README.sh b/README.sh index c8a0b221..8f77839e 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/supersimple-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 1429c4dd..7bbe8704 100644 --- a/build.gradle +++ b/build.gradle @@ -27,16 +27,13 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' - implementation 'org.springframework.boot:spring-boot-starter-actuator' - implementation 'org.springframework.boot:spring-boot-starter-validation' - implementation 'org.springframework.boot:spring-boot-starter-web' + implementation 'org.springframework.boot:spring-boot-starter' + implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fcc..a3c9b57a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,11 +136,11 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/supersimple-producer:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer - juplo.producer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer + spring.kafka.template.default-topic: test consumer: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index f64266b4..1f08dfee 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - spring-producer - Spring Producer - A Simple Producer, based on Spring Boot, that sends messages via Kafka + supersimple-producer + Super Simple Producer + Most minimal Kafka Producer ever! 1.0-SNAPSHOT @@ -24,11 +24,7 @@ org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-actuator + spring-boot-starter org.springframework.boot @@ -36,12 +32,8 @@ true - org.springframework.boot - spring-boot-starter-validation - - - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,21 +45,11 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test test - - org.awaitility - awaitility - test - diff --git a/settings.gradle b/settings.gradle index 5f580c98..3d49d64b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'spring-producer' +rootProject.name = 'supersimple-producer' diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 0069257f..00000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index 0090ceea..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,56 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleProducer exampleProducer( - ApplicationProperties properties, - Producer kafkaProducer, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleProducer( - properties.getClientId(), - properties.getProducerProperties().getTopic(), - properties.getProducerProperties().getThrottle() == null - ? Duration.ofMillis(500) - : properties.getProducerProperties().getThrottle(), - kafkaProducer, - () -> applicationContext.close()); - } - - @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getProducerProperties().getAcks()); - props.put("delivery.timeout.ms", (int)properties.getProducerProperties().getDeliveryTimeout().toMillis()); - props.put("max.block.ms", (int)properties.getProducerProperties().getMaxBlock().toMillis()); - props.put("buffer.memory", properties.getProducerProperties().getBufferMemory()); - props.put("batch.size", properties.getProducerProperties().getBatchSize()); - props.put("metadata.max.age.ms", 5000); // 5 Sekunden - props.put("request.timeout.ms", 5000); // 5 Sekunden - props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); - props.put("compression.type", properties.getProducerProperties().getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - return new KafkaProducer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 43232628..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,62 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - - @NotNull - private ProducerProperties producer; - - - public ProducerProperties getProducerProperties() - { - return producer; - } - - - @Validated - @Getter - @Setter - static class ProducerProperties - { - @NotNull - @NotEmpty - private String topic; - @NotNull - @NotEmpty - private String acks; - @NotNull - private Duration deliveryTimeout; - @NotNull - private Duration maxBlock; - @NotNull - private Long bufferMemory; - @NotNull - private Integer batchSize; - @NotNull - private Duration linger; - @NotNull - @NotEmpty - private String compressionType; - private Duration throttle; - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 25e885d9..21b30b06 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,140 +1,57 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; -import java.time.Duration; +import java.util.concurrent.CompletableFuture; @Slf4j -public class ExampleProducer implements Runnable +// tag::supersimple[] +@SpringBootApplication +public class ExampleProducer implements ApplicationRunner { - private final String id; - private final String topic; - private final Duration throttle; - private final Producer producer; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = true; - private long produced = 0; - - - public ExampleProducer( - String id, - String topic, - Duration throttle, - Producer producer, - Runnable closeCallback) - { - this.id = id; - this.topic = topic; - this.throttle = throttle; - this.producer = producer; - - workerThread = new Thread(this, "ExampleProducer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; - } - + @Autowired + KafkaTemplate kafkaTemplate; @Override - public void run() + public void run(ApplicationArguments args) { - long i = 0; - - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttle.isPositive()) - { - try - { - Thread.sleep(throttle); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally + for (int i = 0; true; i++) { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); + // end::supersimple[] + // tag::callback[] + CompletableFuture> completableFuture = + // tag::supersimple[] + kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i)); + // end::supersimple[] + + completableFuture.thenAccept(result -> + log.info( + "Sent {}={} to partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset())); + + completableFuture.exceptionally(e -> { + log.error("ERROR sending message", e); + return null; + }); + // end::callback[] + // tag::supersimple[] } } - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - - - public void shutdown() throws InterruptedException + public static void main(String[] args) { - log.info("{} joining the worker-thread...", id); - running = false; - workerThread.join(); + SpringApplication.run(ExampleProducer.class, args); } } +// end::supersimple[] diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea1284..51cc46c7 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,46 +1,5 @@ -juplo: - bootstrap-server: :9092 - client-id: DEV - producer: - topic: test - acks: -1 - delivery-timeout: 10s - max-block: 5s - buffer-memory: 33554432 - batch-size: 16384 - linger: 0 - compression-type: gzip - throttle: 500 -management: - endpoint: - shutdown: - enabled: true - endpoints: - web: - exposure: - include: "*" - info: - env: - enabled: true - java: - enabled: true -info: +spring: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} - producer: - topic: ${juplo.producer.topic} - acks: ${juplo.producer.acks} - delivery-timeout: ${juplo.producer.delivery-timeout} - max-block: ${juplo.producer.max-block} - buffer-memory: ${juplo.producer.buffer-memory} - batch-size: ${juplo.producer.batch-size} - linger: ${juplo.producer.linger} - compression-type: ${juplo.producer.compression-type} - throttle: ${juplo.producer.throttle} -logging: - level: - root: INFO - de.juplo: TRACE -server: - port: 8880 + bootstrap-servers: :9092 + template: + default-topic: test diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 29ca80a7..0c77d14a 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,96 +1,24 @@ package de.juplo.kafka; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; - -import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.producer.topic=" + TOPIC}) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@Slf4j + "spring.kafka.template.default-topic=" + TOPIC + }) +@EmbeddedKafka(topics = TOPIC) public class ApplicationTests { - static final String TOPIC = "FOO"; - static final int PARTITIONS = 10; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - + public final static String TOPIC = "out"; @Test public void testApplicationStartup() { - await("Application is healthy") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> mockMvc - .perform(get("/actuator/health")) - .andExpect(status().isOk()) - .andExpect(jsonPath("status").value("UP"))); - } - - @Test - public void testSendMessage() throws Exception - { - await("Some messages were send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() >= 1); - } - - - static class Consumer - { - final List> received = new LinkedList<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC) - public void receive(ConsumerRecord record) - { - log.debug("Received message: {}", record); - received.add(record); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } } } -- 2.20.1