From 1262341404f6248477fe2c7c422996f3ecd3e11e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Sep 2024 23:31:07 +0200 Subject: [PATCH] =?utf8?q?`rest-producer`=20in=20einen=20`spring-producer`?= =?utf8?q?=20zur=C3=BCckgebaut?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 98 ++----------------- docker/docker-compose.yml | 39 +------- pom.xml | 10 +- src/main/java/de/juplo/kafka/Application.java | 57 ++++++++++- .../juplo/kafka/ApplicationConfiguration.java | 1 - .../de/juplo/kafka/ApplicationProperties.java | 1 - .../java/de/juplo/kafka/ErrorResponse.java | 11 --- .../java/de/juplo/kafka/ExampleProducer.java | 60 ++++++------ .../java/de/juplo/kafka/ProduceFailure.java | 21 ---- .../java/de/juplo/kafka/ProduceResult.java | 11 --- .../java/de/juplo/kafka/ProduceSuccess.java | 12 --- .../java/de/juplo/kafka/ApplicationTests.java | 10 +- 12 files changed, 98 insertions(+), 233 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ErrorResponse.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceFailure.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceResult.java delete mode 100644 src/main/java/de/juplo/kafka/ProduceSuccess.java diff --git a/README.sh b/README.sh index d23ed67..9148486 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1 +docker compose -f docker/docker-compose.yml rm -svf producer if [[ $(docker image ls -q $IMAGE) == "" || @@ -29,94 +29,12 @@ docker compose -f docker/docker-compose.yml up -t0 -d cli sleep 1 docker compose -f docker/docker-compose.yml logs setup +docker compose -f docker/docker-compose.yml ps docker compose -f docker/docker-compose.yml up -d producer -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +sleep 5 -# tag::hashed[] -echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus -# end::hashed[] -echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus -# tag::hashed[] -echo -n Nachricht 1 an peter über producer | http -v :8080/peter -# end::hashed[] -echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 2 an peter über producer | http -v :8080/peter -echo -n Nachricht 3 an peter über producer | http -v :8080/peter +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -echo Nachrichten in Partition 0: -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 1: -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo - -docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' -echo "Altering number of partitions from 2 to 3..." -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -# tag::repartitioning[] -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 -# end::repartitioning[] -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker compose -f docker/docker-compose.yml restart producer -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done - -echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 5 an peter über producer | http -v :8080/peter -echo -n Nachricht 4 an peter über producer | http -v :8080/peter -echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 6 an peter über producer | http -v :8080/peter - -echo Nachrichten in Partition 0: -# tag::kafkacat[] -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -# end::kafkacat[] -echo -echo Nachrichten in Partition 1: -# tag::kafkacat[] -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -# end::kafkacat[] -echo -echo Nachrichten in Partition 2: -kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe - - -docker compose -f docker/docker-compose.yml restart setup -sleep 1 -docker compose -f docker/docker-compose.yml up -d producer-0 producer-1 -while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done -while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done - -# tag::fixed[] -echo -n Nachricht 1 über producer-0 | http -v :8000/klaus -echo -n Nachricht 1 über producer-1 | http -v :8001/klaus -echo -n Nachricht 2 über producer-0 | http -v :8000/peter -echo -n Nachricht 2 über producer-1 | http -v :8001/peter -# end::fixed[] - -docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' -echo "Altering number of partitions from 2 to 3..." -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker compose -f docker/docker-compose.yml restart producer-0 producer-1 -while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done -while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done - -echo -n Nachricht 3 über producer-0 | http -v :8000/klaus -echo -n Nachricht 3 über producer-1 | http -v :8001/klaus -echo -n Nachricht 4 über producer-0 | http -v :8000/peter -echo -n Nachricht 4 über producer-1 | http -v :8001/peter - -echo Nachrichten in Partition 0: -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 1: -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 2: -kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +docker compose -f docker/docker-compose.yml stop producer +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' +docker compose -f docker/docker-compose.yml logs producer diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2eaa6b6..d03918d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -188,49 +188,12 @@ services: - kafka-3 producer: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8080:8080 + image: juplo/simple-producer:1.0-SNAPSHOT environment: - server.port: 8080 producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer-0: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8000:8080 - environment: - server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer-0 - producer.topic: test - producer.partition: 0 - - producer-1: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8001:8080 - environment: - server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer-1 - producer.topic: test - producer.partition: 1 - - consumer-1: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-1 - - consumer-2: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-2 - - consumer-3: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-3 - volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml index 999c66b..fb24c40 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs + spring-producer + Spring Producer + A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs 1.0-SNAPSHOT @@ -22,10 +22,6 @@ - - org.springframework.boot - spring-boot-starter-web - org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index ba6aeee..d269945 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,15 +1,64 @@ package de.juplo.kafka; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.ExecutionException; @SpringBootApplication -@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class)) -public class Application +@Slf4j +public class Application implements ApplicationRunner { + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + Producer kafkaProducer; + @Autowired + ExampleProducer exampleProducer; + @Autowired + ConfigurableApplicationContext context; + + ListenableFuture consumerJob; + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting SimpleConsumer"); + consumerJob = taskExecutor.submitListenable(exampleProducer); + consumerJob.addCallback( + exitStatus -> + { + log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus); + SpringApplication.exit(context, () -> exitStatus); + }, + t -> + { + log.error("SimpleConsumer exited abnormally!", t); + SpringApplication.exit(context, () -> 2); + }); + } + + @PreDestroy + public void shutdown() throws ExecutionException, InterruptedException + { + log.info("Signaling ExampleProducer to quit its work"); + exampleProducer.shutdown(); + log.info("Waiting for ExampleProducer to finish its work"); + consumerJob.get(); + log.info("ExampleProducer finished its work"); + } + + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 132413c..6bd5cd5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -22,7 +22,6 @@ public class ApplicationConfiguration new ExampleProducer( properties.getClientId(), properties.getTopic(), - properties.getPartition(), kafkaProducer); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 8efacd4..4bf66a8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -22,7 +22,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - private Integer partition; @NotNull @NotEmpty private String acks; diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 94608f9..d1f1bf9 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -4,48 +4,55 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; -import java.math.BigInteger; +import java.util.concurrent.Callable; @Slf4j -@RestController @RequiredArgsConstructor -public class ExampleProducer +public class ExampleProducer implements Callable { private final String id; private final String topic; - private final Integer partition; private final Producer producer; + private volatile boolean running = true; private long produced = 0; - @PostMapping(path = "{key}") - public DeferredResult send( - @PathVariable String key, - @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) + + @Override + public Integer call() { - DeferredResult result = new DeferredResult<>(); + long i = 0; + + try + { + for (; running; i++) + { + send(Long.toString(i%10), Long.toString(i)); + Thread.sleep(500); + } + } + catch (Exception e) + { + log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced); + return 1; + } + log.info("{}: Produced {} messages in total, exiting!", id, produced); + return 0; + } + + void send(String key, String value) + { final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( topic, // Topic - partition, // Partition key, // Key value // Value ); - record.headers().add("source", id.getBytes()); - if (correlationId != null) - { - record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); - } - producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); @@ -53,7 +60,6 @@ public class ExampleProducer { // HANDLE SUCCESS produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -68,7 +74,6 @@ public class ExampleProducer else { // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -82,19 +87,16 @@ public class ExampleProducer long now = System.currentTimeMillis(); log.trace( - "{} - Queued message with key={} latency={}ms", + "{} - Queued message for key {}, latency={}ms", id, record.key(), now - time ); - - return result; } - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) + + public void shutdown() { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + running = false; } } diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java deleted file mode 100644 index 873a67b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceFailure implements ProduceResult -{ - private final String error; - private final String exception; - private final Integer status; - - - public ProduceFailure(Exception e) - { - status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java deleted file mode 100644 index ceff329..0000000 --- a/src/main/java/de/juplo/kafka/ProduceResult.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonInclude; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@JsonInclude(NON_NULL) -public interface ProduceResult -{ -} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java deleted file mode 100644 index 9c79e8b..0000000 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka; - - -import lombok.Value; - - -@Value -public class ProduceSuccess implements ProduceResult -{ - Integer partition; - Long offset; -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 50bc20b..8d579e9 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "spring.kafka.consumer.auto-offset-reset=earliest", "producer.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @@ -37,8 +36,6 @@ public class ApplicationTests static final String TOPIC = "FOO"; static final int PARTITIONS = 10; - @Autowired - MockMvc mockMvc; @Autowired Consumer consumer; @@ -53,12 +50,9 @@ public class ApplicationTests @Test void testSendMessage() throws Exception { - mockMvc - .perform(post("/peter").content("Hallo Welt!")) - .andExpect(status().isOk()); - await("Message was send") + await("Some messages were send") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); + .until(() -> consumer.received.size() >= 1); } -- 2.20.1