From ad9035307e20476bab603b44ee39646b7086cbf7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 25 Mar 2025 18:25:47 +0100 Subject: [PATCH] =?utf8?q?Separate=20`README.sh`=20f=C3=BCr=20Maven=20und?= =?utf8?q?=20Gradle?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README-gradle.sh | 29 ++++ README-maven.sh | 29 ++++ README.sh | 42 ------ docker/docker-compose.yml | 20 +-- pom.xml | 6 +- .../juplo/kafka/ApplicationConfiguration.java | 34 ----- .../de/juplo/kafka/ApplicationProperties.java | 39 ----- .../java/de/juplo/kafka/ExampleProducer.java | 135 ------------------ .../java/de/juplo/kafka/ApplicationTests.java | 94 ------------ 9 files changed, 62 insertions(+), 366 deletions(-) create mode 100755 README-gradle.sh create mode 100755 README-maven.sh delete mode 100755 README.sh delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/ExampleProducer.java delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java diff --git a/README-gradle.sh b/README-gradle.sh new file mode 100755 index 00000000..bc566150 --- /dev/null +++ b/README-gradle.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +IMAGE=juplo/technick-check:1.0-SNAPSHOT + +if [ "$1" = "cleanup" ] +then + docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans + mvn clean + exit +fi + +docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 +docker compose -f docker/docker-compose.yml rm -svf technick-check + +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn clean install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi + +docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 + + +docker compose -f docker/docker-compose.yml up -d technick-check diff --git a/README-maven.sh b/README-maven.sh new file mode 100755 index 00000000..bc566150 --- /dev/null +++ b/README-maven.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +IMAGE=juplo/technick-check:1.0-SNAPSHOT + +if [ "$1" = "cleanup" ] +then + docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans + mvn clean + exit +fi + +docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 +docker compose -f docker/docker-compose.yml rm -svf technick-check + +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn clean install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi + +docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 + + +docker compose -f docker/docker-compose.yml up -d technick-check diff --git a/README.sh b/README.sh deleted file mode 100755 index 501349a9..00000000 --- a/README.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d peter ute -sleep 15 - -docker compose -f docker/docker-compose.yml stop producer - -echo -echo "Von peter empfangen:" -docker compose -f docker/docker-compose.yml logs peter | grep '\ test\/.' -echo -echo "Von ute empfangen:" -docker compose -f docker/docker-compose.yml logs ute | grep '\ test\/.' - -docker compose -f docker/docker-compose.yml stop peter ute diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2bb942bf..8b6cab33 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -135,24 +135,8 @@ services: - kafka-2 - kafka-3 - producer: - image: juplo/spring-producer:1.0-kafkatemplate-SNAPSHOT - environment: - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: producer - juplo.producer.topic: test - - consumer: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer - - peter: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group peter - - ute: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group ute + technick-check: + image: juplo/technick-check:1.0-SNAPSHOT volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index 27951434..20a8c2a5 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,8 @@ de.juplo.kafka - spring-producer - Spring Producer - A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka - 1.0-kafkatemplate-SNAPSHOT + technick-check + 1.0-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index 4a5c8da2..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.KafkaTemplate; - -import java.time.Duration; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleProducer exampleProducer( - ApplicationProperties properties, - KafkaProperties kafkaProperties, - KafkaTemplate kafkaTemplate, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleProducer( - kafkaProperties.getClientId(), - properties.getProducerProperties().getTopic(), - properties.getProducerProperties().getThrottle() == null - ? Duration.ofMillis(500) - : properties.getProducerProperties().getThrottle(), - kafkaTemplate, - () -> applicationContext.close()); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 908072cb..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - private ProducerProperties producer; - - - public ProducerProperties getProducerProperties() - { - return producer; - } - - - @Validated - @Getter - @Setter - static class ProducerProperties - { - @NotNull - @NotEmpty - private String topic; - private Duration throttle; - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java deleted file mode 100644 index 09eaa113..00000000 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ /dev/null @@ -1,135 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; - - -@Slf4j -public class ExampleProducer implements Runnable -{ - private final String id; - private final String topic; - private final Duration throttle; - private final KafkaTemplate kafkaTemplate; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = true; - private long produced = 0; - - - public ExampleProducer( - String id, - String topic, - Duration throttle, - KafkaTemplate kafkaTemplate, - Runnable closeCallback) - { - this.id = id; - this.topic = topic; - this.throttle = throttle; - this.kafkaTemplate = kafkaTemplate; - - workerThread = new Thread(this, "ExampleProducer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; - } - - - @Override - public void run() - { - long i = 0; - - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttle.isPositive()) - { - try - { - Thread.sleep(throttle); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - kafkaTemplate.send(topic, key, value).whenComplete((result, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - RecordMetadata metadata = result.getRecordMetadata(); - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - produced++; - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - - - public void shutdown() throws InterruptedException - { - log.info("{} joining the worker-thread...", id); - running = false; - workerThread.join(); - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index 7687e9ca..00000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,94 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; - -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; - -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.producer.topic=" + TOPIC}) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@Slf4j -public class ApplicationTests -{ - static final String TOPIC = "FOO"; - static final int PARTITIONS = 10; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - - - @Test - public void testApplicationStartup() - { - await("Application is healthy") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> mockMvc - .perform(get("/actuator/health")) - .andExpect(status().isOk()) - .andExpect(jsonPath("status").value("UP"))); - } - - @Test - public void testSendMessage() throws Exception - { - await("Some messages were send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() >= 1); - } - - - static class Consumer - { - final List> received = new LinkedList<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC) - public void receive(ConsumerRecord record) - { - log.debug("Received message: {}", record); - received.add(record); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -} -- 2.20.1