#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf producer
+docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1
if [[
$(docker image ls -q $IMAGE) == "" ||
docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2
-sleep 15
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-docker compose -f docker/docker-compose.yml stop producer
+# tag::hashed[]
+echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
+# end::hashed[]
+echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
+# tag::hashed[]
+echo -n Nachricht 1 an peter über producer | http -v :8080/peter
+# end::hashed[]
+echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 2 an peter über producer | http -v :8080/peter
+echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+echo Nachrichten in Partition 0:
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
echo
-echo "Von consumer-1 empfangen:"
-docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.'
+echo Nachrichten in Partition 1:
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
echo
-echo "Von consumer-2 empfangen:"
-docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.'
-docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2
+docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 2 to 3..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+# tag::repartitioning[]
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
+# end::repartitioning[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker compose -f docker/docker-compose.yml restart producer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+
+echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 5 an peter über producer | http -v :8080/peter
+echo -n Nachricht 4 an peter über producer | http -v :8080/peter
+echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 6 an peter über producer | http -v :8080/peter
+
+echo Nachrichten in Partition 0:
+# tag::kafkacat[]
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+# end::kafkacat[]
+echo
+echo Nachrichten in Partition 1:
+# tag::kafkacat[]
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+# end::kafkacat[]
+echo
+echo Nachrichten in Partition 2:
+kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+
+
+docker compose -f docker/docker-compose.yml restart setup
+sleep 1
+docker compose -f docker/docker-compose.yml up -d producer-0 producer-1
+while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
+while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
+
+# tag::fixed[]
+echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
+echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
+echo -n Nachricht 2 über producer-0 | http -v :8000/peter
+echo -n Nachricht 2 über producer-1 | http -v :8001/peter
+# end::fixed[]
+
+docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 2 to 3..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker compose -f docker/docker-compose.yml restart producer-0 producer-1
+while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
+while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
+
+echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
+echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
+echo -n Nachricht 4 über producer-0 | http -v :8000/peter
+echo -n Nachricht 4 über producer-1 | http -v :8001/peter
+
+echo Nachrichten in Partition 0:
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+echo
+echo Nachrichten in Partition 1:
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+echo
+echo Nachrichten in Partition 2:
+kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/rest-producer:1.0-SNAPSHOT
+ ports:
+ - 8080:8080
environment:
- juplo.bootstrap-server: kafka:9092
- juplo.client-id: producer
- juplo.producer.topic: test
+ server.port: 8080
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer
+ producer.topic: test
+
+ producer-0:
+ image: juplo/rest-producer:1.0-SNAPSHOT
+ ports:
+ - 8000:8080
+ environment:
+ server.port: 8080
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer-0
+ producer.topic: test
+ producer.partition: 0
+
+ producer-1:
+ image: juplo/rest-producer:1.0-SNAPSHOT
+ ports:
+ - 8001:8080
+ environment:
+ server.port: 8080
+ producer.bootstrap-server: kafka:9092
+ producer.client-id: producer-1
+ producer.topic: test
+ producer.partition: 1
consumer-1:
image: juplo/simple-consumer:1.0-SNAPSHOT
image: juplo/simple-consumer:1.0-SNAPSHOT
command: kafka:9092 test my-group consumer-2
+ consumer-3:
+ image: juplo/simple-consumer:1.0-SNAPSHOT
+ command: kafka:9092 test my-group consumer-3
+
volumes:
zookeeper-data:
zookeeper-log:
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>spring-producer</artifactId>
- <name>Spring Producer</name>
- <description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
+ <artifactId>rest-producer</artifactId>
+ <name>REST Producer</name>
+ <description>A Simple Producer that takes messages via POST and confirms successs</description>
<version>1.0-SNAPSHOT</version>
<properties>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
+@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class))
public class Application
{
public static void main(String[] args)
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Bean
public ExampleProducer exampleProducer(
ApplicationProperties properties,
- Producer<String, String> kafkaProducer,
- ConfigurableApplicationContext applicationContext)
+ Producer<String, String> kafkaProducer)
{
return
new ExampleProducer(
properties.getClientId(),
properties.getProducerProperties().getTopic(),
- properties.getProducerProperties().getThrottle() == null
- ? Duration.ofMillis(500)
- : properties.getProducerProperties().getThrottle(),
- kafkaProducer,
- () -> applicationContext.close());
+ properties.getProducer().getPartition(),
+ kafkaProducer);
}
@Bean(destroyMethod = "")
@NotNull
@NotEmpty
private String topic;
+ private Integer partition;
@NotNull
@NotEmpty
private String acks;
@NotNull
@NotEmpty
private String compressionType;
- private Duration throttle;
}
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ErrorResponse
+{
+ private final String error;
+ private final Integer status;
+}
package de.juplo.kafka;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.math.BigInteger;
import java.time.Duration;
@Slf4j
-public class ExampleProducer implements Runnable
+@RestController
+@RequiredArgsConstructor
+public class ExampleProducer
{
private final String id;
private final String topic;
- private final Duration throttle;
+ private final Integer partition;
private final Producer<String, String> producer;
- private final Thread workerThread;
- private final Runnable closeCallback;
- private volatile boolean running = true;
private long produced = 0;
-
- public ExampleProducer(
- String id,
- String topic,
- Duration throttle,
- Producer<String, String> producer,
- Runnable closeCallback)
+ @PostMapping(path = "{key}")
+ public DeferredResult<ProduceResult> send(
+ @PathVariable String key,
+ @RequestHeader(name = "X-id", required = false) Long correlationId,
+ @RequestBody String value)
{
- this.id = id;
- this.topic = topic;
- this.throttle = throttle;
- this.producer = producer;
+ DeferredResult<ProduceResult> result = new DeferredResult<>();
- workerThread = new Thread(this, "ExampleProducer Worker-Thread");
- workerThread.start();
-
- this.closeCallback = closeCallback;
- }
-
-
- @Override
- public void run()
- {
- long i = 0;
-
- try
- {
- for (; running; i++)
- {
- send(Long.toString(i%10), Long.toString(i));
-
- if (throttle.isPositive())
- {
- try
- {
- Thread.sleep(throttle);
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Interrupted while throttling!", e);
- }
- }
- }
- }
- catch (Exception e)
- {
- log.error("{} - Unexpected error!", id, e);
- log.info("{} - Triggering exit of application!", id);
- new Thread(closeCallback).start();
- }
- finally
- {
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
- }
- }
-
- void send(String key, String value)
- {
final long time = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
topic, // Topic
+ partition, // Partition
key, // Key
value // Value
);
+ record.headers().add("source", id.getBytes());
+ if (correlationId != null)
+ {
+ record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
+ }
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();
{
// HANDLE SUCCESS
produced++;
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
log.debug(
"{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
id,
else
{
// HANDLE ERROR
+ result.setErrorResult(new ProduceFailure(e));
log.error(
"{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
id,
record.value(),
now - time
);
- }
+ return result;
+ }
- public void shutdown() throws InterruptedException
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
{
- log.info("{} joining the worker-thread...", id);
- running = false;
- workerThread.join();
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
}
}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceFailure implements ProduceResult
+{
+ private final String error;
+ private final String exception;
+ private final Integer status;
+
+
+ public ProduceFailure(Exception e)
+ {
+ status = 500;
+ exception = e.getClass().getSimpleName();
+ error = e.getMessage();
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@JsonInclude(NON_NULL)
+public interface ProduceResult
+{
+}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceSuccess implements ProduceResult
+{
+ Integer partition;
+ Long offset;
+}
import static de.juplo.kafka.ApplicationTests.TOPIC;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@Test
public void testSendMessage() throws Exception
{
- await("Some messages were send")
+ mockMvc
+ .perform(post("/peter").content("Hallo Welt!"))
+ .andExpect(status().isOk());
+ await("Message was send")
.atMost(Duration.ofSeconds(5))
- .until(() -> consumer.received.size() >= 1);
+ .until(() -> consumer.received.size() == 1);
}