From: Kai Moritz Date: Sun, 29 Sep 2024 20:14:48 +0000 (+0200) Subject: `spring-producer` zu einem `rest-producer` erweitert X-Git-Tag: producer/rest-producer--2024-11-13--si X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fproducer%2Frest-producer;p=demos%2Fkafka%2Ftraining `spring-producer` zu einem `rest-producer` erweitert --- diff --git a/README.sh b/README.sh index 499780a..1e66cf0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer +docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1 if [[ $(docker image ls -q $IMAGE) == "" || @@ -27,16 +27,93 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2 -sleep 15 +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done -docker compose -f docker/docker-compose.yml stop producer +# tag::hashed[] +echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus +# end::hashed[] +echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus +# tag::hashed[] +echo -n Nachricht 1 an peter über producer | http -v :8080/peter +# end::hashed[] +echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus +echo -n Nachricht 2 an peter über producer | http -v :8080/peter +echo -n Nachricht 3 an peter über producer | http -v :8080/peter +echo Nachrichten in Partition 0: +kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe echo -echo "Von consumer-1 empfangen:" -docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.' +echo Nachrichten in Partition 1: +kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe echo -echo "Von consumer-2 empfangen:" -docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.' -docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2 +docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' +echo "Altering number of partitions from 2 to 3..." +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# tag::repartitioning[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 +# end::repartitioning[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker compose -f docker/docker-compose.yml restart producer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done + +echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus +echo -n Nachricht 5 an peter über producer | http -v :8080/peter +echo -n Nachricht 4 an peter über producer | http -v :8080/peter +echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus +echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus +echo -n Nachricht 6 an peter über producer | http -v :8080/peter + +echo Nachrichten in Partition 0: +# tag::kafkacat[] +kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +# end::kafkacat[] +echo +echo Nachrichten in Partition 1: +# tag::kafkacat[] +kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +# end::kafkacat[] +echo +echo Nachrichten in Partition 2: +kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe + + +docker compose -f docker/docker-compose.yml restart setup +sleep 1 +docker compose -f docker/docker-compose.yml up -d producer-0 producer-1 +while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done +while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done + +# tag::fixed[] +echo -n Nachricht 1 über producer-0 | http -v :8000/klaus +echo -n Nachricht 1 über producer-1 | http -v :8001/klaus +echo -n Nachricht 2 über producer-0 | http -v :8000/peter +echo -n Nachricht 2 über producer-1 | http -v :8001/peter +# end::fixed[] + +docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' +echo "Altering number of partitions from 2 to 3..." +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker compose -f docker/docker-compose.yml restart producer-0 producer-1 +while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done +while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done + +echo -n Nachricht 3 über producer-0 | http -v :8000/klaus +echo -n Nachricht 3 über producer-1 | http -v :8001/klaus +echo -n Nachricht 4 über producer-0 | http -v :8000/peter +echo -n Nachricht 4 über producer-1 | http -v :8001/peter + +echo Nachrichten in Partition 0: +kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +echo +echo Nachrichten in Partition 1: +kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +echo +echo Nachrichten in Partition 2: +kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c417a7f..b8d5596 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,11 +190,36 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT + ports: + - 8080:8080 environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: producer - juplo.producer.topic: test + server.port: 8080 + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + + producer-0: + image: juplo/rest-producer:1.0-SNAPSHOT + ports: + - 8000:8080 + environment: + server.port: 8080 + producer.bootstrap-server: kafka:9092 + producer.client-id: producer-0 + producer.topic: test + producer.partition: 0 + + producer-1: + image: juplo/rest-producer:1.0-SNAPSHOT + ports: + - 8001:8080 + environment: + server.port: 8080 + producer.bootstrap-server: kafka:9092 + producer.client-id: producer-1 + producer.topic: test + producer.partition: 1 consumer-1: image: juplo/simple-consumer:1.0-SNAPSHOT @@ -204,6 +229,10 @@ services: image: juplo/simple-consumer:1.0-SNAPSHOT command: kafka:9092 test my-group consumer-2 + consumer-3: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer-3 + volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml index 841299b..999c66b 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - spring-producer - Spring Producer - A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs + rest-producer + REST Producer + A Simple Producer that takes messages via POST and confirms successs 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 0069257..ba6aeee 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -2,9 +2,12 @@ package de.juplo.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.web.bind.annotation.RestController; @SpringBootApplication +@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class)) public class Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7540dd3..c3422af 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,7 +4,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -19,18 +18,14 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, - ConfigurableApplicationContext applicationContext) + Producer kafkaProducer) { return new ExampleProducer( properties.getClientId(), properties.getProducerProperties().getTopic(), - properties.getProducerProperties().getThrottle() == null - ? Duration.ofMillis(500) - : properties.getProducerProperties().getThrottle(), - kafkaProducer, - () -> applicationContext.close()); + properties.getProducer().getPartition(), + kafkaProducer); } @Bean(destroyMethod = "") diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 4323262..00feb23 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -41,6 +41,7 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + private Integer partition; @NotNull @NotEmpty private String acks; @@ -57,6 +58,5 @@ public class ApplicationProperties @NotNull @NotEmpty private String compressionType; - private Duration throttle; } } diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java new file mode 100644 index 0000000..5ca206d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ErrorResponse.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ErrorResponse +{ + private final String error; + private final Integer status; +} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index bc5cf89..d34f189 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,93 +1,53 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +import java.math.BigInteger; import java.time.Duration; @Slf4j -public class ExampleProducer implements Runnable +@RestController +@RequiredArgsConstructor +public class ExampleProducer { private final String id; private final String topic; - private final Duration throttle; + private final Integer partition; private final Producer producer; - private final Thread workerThread; - private final Runnable closeCallback; - private volatile boolean running = true; private long produced = 0; - - public ExampleProducer( - String id, - String topic, - Duration throttle, - Producer producer, - Runnable closeCallback) + @PostMapping(path = "{key}") + public DeferredResult send( + @PathVariable String key, + @RequestHeader(name = "X-id", required = false) Long correlationId, + @RequestBody String value) { - this.id = id; - this.topic = topic; - this.throttle = throttle; - this.producer = producer; + DeferredResult result = new DeferredResult<>(); - workerThread = new Thread(this, "ExampleProducer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; - } - - - @Override - public void run() - { - long i = 0; - - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttle.isPositive()) - { - try - { - Thread.sleep(throttle); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } - - void send(String key, String value) - { final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( topic, // Topic + partition, // Partition key, // Key value // Value ); + record.headers().add("source", id.getBytes()); + if (correlationId != null) + { + record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray()); + } + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); @@ -95,6 +55,7 @@ public class ExampleProducer implements Runnable { // HANDLE SUCCESS produced++; + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", id, @@ -109,6 +70,7 @@ public class ExampleProducer implements Runnable else { // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); log.error( "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", id, @@ -129,13 +91,14 @@ public class ExampleProducer implements Runnable record.value(), now - time ); - } + return result; + } - public void shutdown() throws InterruptedException + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) { - log.info("{} joining the worker-thread...", id); - running = false; - workerThread.join(); + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); } } diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java new file mode 100644 index 0000000..873a67b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceFailure implements ProduceResult +{ + private final String error; + private final String exception; + private final Integer status; + + + public ProduceFailure(Exception e) + { + status = 500; + exception = e.getClass().getSimpleName(); + error = e.getMessage(); + } +} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java new file mode 100644 index 0000000..ceff329 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceResult.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + + +@JsonInclude(NON_NULL) +public interface ProduceResult +{ +} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java new file mode 100644 index 0000000..9c79e8b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceSuccess.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceSuccess implements ProduceResult +{ + Integer partition; + Long offset; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fe8609e..8b16f65 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -21,6 +21,7 @@ import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -66,9 +67,12 @@ public class ApplicationTests @Test public void testSendMessage() throws Exception { - await("Some messages were send") + mockMvc + .perform(post("/peter").content("Hallo Welt!")) + .andExpect(status().isOk()); + await("Message was send") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() >= 1); + .until(() -> consumer.received.size() == 1); }