#!/bin/bash
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1
+docker compose -f docker/docker-compose.yml rm -svf producer
if [[
$(docker image ls -q $IMAGE) == "" ||
sleep 1
docker compose -f docker/docker-compose.yml logs setup
+docker compose -f docker/docker-compose.yml ps
docker compose -f docker/docker-compose.yml up -d producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+sleep 5
-# tag::hashed[]
-echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
-# end::hashed[]
-echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
-# tag::hashed[]
-echo -n Nachricht 1 an peter über producer | http -v :8080/peter
-# end::hashed[]
-echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 2 an peter über producer | http -v :8080/peter
-echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-
-docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# tag::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-# end::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker compose -f docker/docker-compose.yml restart producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 5 an peter über producer | http -v :8080/peter
-echo -n Nachricht 4 an peter über producer | http -v :8080/peter
-echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an peter über producer | http -v :8080/peter
-
-echo Nachrichten in Partition 0:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 1:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-
-
-docker compose -f docker/docker-compose.yml restart setup
-sleep 1
-docker compose -f docker/docker-compose.yml up -d producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-# tag::fixed[]
-echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 2 über producer-0 | http -v :8000/peter
-echo -n Nachricht 2 über producer-1 | http -v :8001/peter
-# end::fixed[]
-
-docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker compose -f docker/docker-compose.yml restart producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 4 über producer-0 | http -v :8000/peter
-echo -n Nachricht 4 über producer-1 | http -v :8001/peter
-
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+docker compose -f docker/docker-compose.yml stop producer
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
+docker compose -f docker/docker-compose.yml logs producer
- kafka-3
producer:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8080:8080
+ image: juplo/simple-producer:1.0-SNAPSHOT
environment:
- server.port: 8080
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
- producer-0:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8000:8080
- environment:
- server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer-0
- producer.topic: test
- producer.partition: 0
-
- producer-1:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8001:8080
- environment:
- server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer-1
- producer.topic: test
- producer.partition: 1
-
- consumer-1:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-1
-
- consumer-2:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-2
-
- consumer-3:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-3
-
volumes:
zookeeper-data:
zookeeper-log:
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>rest-producer</artifactId>
- <name>REST Producer</name>
- <description>A Simple Producer that takes messages via POST and confirms successs</description>
+ <artifactId>spring-producer</artifactId>
+ <name>Spring Producer</name>
+ <description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
<version>1.0-SNAPSHOT</version>
<properties>
package de.juplo.kafka;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.ExecutionException;
@SpringBootApplication
-@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class))
-public class Application
+@Slf4j
+public class Application implements ApplicationRunner
{
+ @Autowired
+ ThreadPoolTaskExecutor taskExecutor;
+ @Autowired
+ Producer<?, ?> kafkaProducer;
+ @Autowired
+ ExampleProducer exampleProducer;
+ @Autowired
+ ConfigurableApplicationContext context;
+
+ ListenableFuture<Integer> consumerJob;
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting SimpleConsumer");
+ consumerJob = taskExecutor.submitListenable(exampleProducer);
+ consumerJob.addCallback(
+ exitStatus ->
+ {
+ log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+ SpringApplication.exit(context, () -> exitStatus);
+ },
+ t ->
+ {
+ log.error("SimpleConsumer exited abnormally!", t);
+ SpringApplication.exit(context, () -> 2);
+ });
+ }
+
+ @PreDestroy
+ public void shutdown() throws ExecutionException, InterruptedException
+ {
+ log.info("Signaling ExampleProducer to quit its work");
+ exampleProducer.shutdown();
+ log.info("Waiting for ExampleProducer to finish its work");
+ consumerJob.get();
+ log.info("ExampleProducer finished its work");
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
new ExampleProducer(
properties.getClientId(),
properties.getTopic(),
- properties.getPartition(),
kafkaProducer);
}
@NotNull
@NotEmpty
private String topic;
- private Integer partition;
@NotNull
@NotEmpty
private String acks;
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class ErrorResponse
-{
- private final String error;
- private final Integer status;
-}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.async.DeferredResult;
-import java.math.BigInteger;
+import java.util.concurrent.Callable;
@Slf4j
-@RestController
@RequiredArgsConstructor
-public class ExampleProducer
+public class ExampleProducer implements Callable<Integer>
{
private final String id;
private final String topic;
- private final Integer partition;
private final Producer<String, String> producer;
+ private volatile boolean running = true;
private long produced = 0;
- @PostMapping(path = "{key}")
- public DeferredResult<ProduceResult> send(
- @PathVariable String key,
- @RequestHeader(name = "X-id", required = false) Long correlationId,
- @RequestBody String value)
+
+ @Override
+ public Integer call()
{
- DeferredResult<ProduceResult> result = new DeferredResult<>();
+ long i = 0;
+
+ try
+ {
+ for (; running; i++)
+ {
+ send(Long.toString(i%10), Long.toString(i));
+ Thread.sleep(500);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
+ return 1;
+ }
+ log.info("{}: Produced {} messages in total, exiting!", id, produced);
+ return 0;
+ }
+
+ void send(String key, String value)
+ {
final long time = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
topic, // Topic
- partition, // Partition
key, // Key
value // Value
);
- record.headers().add("source", id.getBytes());
- if (correlationId != null)
- {
- record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
- }
-
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();
{
// HANDLE SUCCESS
produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
log.debug(
"{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
id,
else
{
// HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
log.error(
"{} - ERROR key={} timestamp={} latency={}ms: {}",
id,
record.key(),
now - time
);
-
- return result;
}
- @ExceptionHandler
- @ResponseStatus(HttpStatus.BAD_REQUEST)
- public ErrorResponse illegalStateException(IllegalStateException e)
+
+ public void shutdown()
{
- return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ running = false;
}
}
+++ /dev/null
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class ProduceFailure implements ProduceResult
-{
- private final String error;
- private final String exception;
- private final Integer status;
-
-
- public ProduceFailure(Exception e)
- {
- status = 500;
- exception = e.getClass().getSimpleName();
- error = e.getMessage();
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@JsonInclude(NON_NULL)
-public interface ProduceResult
-{
-}
+++ /dev/null
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class ProduceSuccess implements ProduceResult
-{
- Integer partition;
- Long offset;
-}
properties = {
"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"producer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "spring.kafka.consumer.auto-offset-reset=earliest",
"producer.topic=" + TOPIC})
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
static final String TOPIC = "FOO";
static final int PARTITIONS = 10;
- @Autowired
- MockMvc mockMvc;
@Autowired
Consumer consumer;
@Test
void testSendMessage() throws Exception
{
- mockMvc
- .perform(post("/peter").content("Hallo Welt!"))
- .andExpect(status().isOk());
- await("Message was send")
+ await("Some messages were send")
.atMost(Duration.ofSeconds(5))
- .until(() -> consumer.received.size() == 1);
+ .until(() -> consumer.received.size() >= 1);
}