exit
fi
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
if [[
$(docker image ls -q $IMAGE) == "" ||
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose up -d producer-0 producer-1 consumer
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+docker-compose up setup
+docker-compose up -d producer-0 producer-1
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
+docker-compose up -d consumer
echo foo | http -v :8000/foo
echo foo | http -v :8001/foo
+echo foo | http -v :8001/foo
+echo foo | http -v :8000/bar
+echo foobar | http -v :8000/bar
+echo foofoo | http -v :8000/bar
+echo barbar | http -v :8000/bar
+echo barfoo | http -v :8000/bar
+echo bar | http -v :8000/bar
-sleep 5
-
-http -v :8081/seen
+docker-compose logs consumer
docker-compose up -d
-
-sleep 5
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
docker-compose exec -T cli bash << 'EOF'
echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
EOF
docker-compose restart producer-0 producer-1
-
while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
+
+echo "Messages from peter"
+docker-compose logs consumer | grep k=peter
+echo "Messages from beate"
+docker-compose logs consumer | grep k=beate
+echo "Messages from foo"
+docker-compose logs consumer | grep k=foo
version: '3.2'
services:
zookeeper:
- image: confluentinc/cp-zookeeper:7.0.2
+ image: confluentinc/cp-zookeeper:7.1.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- 2181:2181
- kafka:
- image: confluentinc/cp-kafka:7.0.2
+ kafka-1:
+ image: confluentinc/cp-kafka:7.1.3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9081:9081
+ depends_on:
+ - zookeeper
+
+ kafka-2:
+ image: confluentinc/cp-kafka:7.1.3
+ environment:
+ KAFKA_BROKER_ID: 2
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
- KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ports:
- 9092:9082
- 9082:9082
+ networks:
+ default:
+ aliases:
+ - kafka
depends_on:
- zookeeper
- kafka-ui:
- image: provectuslabs/kafka-ui:0.3.3
- ports:
- - 8080:8080
+ kafka-3:
+ image: confluentinc/cp-kafka:7.1.3
environment:
- KAFKA_CLUSTERS_0_NAME: local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+ KAFKA_BROKER_ID: 3
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+ KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+ KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ ports:
+ - 9083:9083
+ depends_on:
+ - zookeeper
+
+ setup:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ "
cli:
image: juplo/toolbox
ports:
- 8000:8080
environment:
+ server.port: 8080
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
ports:
- 8001:8080
environment:
+ server.port: 8080
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
peter:
image: juplo/rest-client:1.0-SNAPSHOT
environment:
+ server.port: 8080
rest-client.baseUrl: http://producer-1:8080
rest-client.username: peter
rest-client.throttle-ms: 1000
klaus:
image: juplo/rest-client:1.0-SNAPSHOT
environment:
+ server.port: 8080
rest-client.baseUrl: http://producer-1:8080
rest-client.username: klaus
rest-client.throttle-ms: 1100
beate:
image: juplo/rest-client:1.0-SNAPSHOT
environment:
+ server.port: 8080
rest-client.baseUrl: http://producer-0:8080
rest-client.username: beate
rest-client.throttle-ms: 900
franz:
image: juplo/rest-client:1.0-SNAPSHOT
environment:
+ server.port: 8080
rest-client.baseUrl: http://producer-1:8080
rest-client.username: franz
rest-client.throttle-ms: 800
uschi:
image: juplo/rest-client:1.0-SNAPSHOT
environment:
+ server.port: 8080
rest-client.baseUrl: http://producer-0:8080
rest-client.username: uschi
rest-client.throttle-ms: 1200
consumer:
- image: juplo/counting-consumer:1.0-SNAPSHOT
- ports:
- - 8081:8081
- environment:
- consumer.bootstrap-server: kafka:9092
- consumer.client-id: my-group
- consumer.client-id: consumer
- consumer.topic: test
+ image: juplo/toolbox
+ command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.5</version>
+ <version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka</groupId>
<artifactId>rest-producer</artifactId>
- <name>REST Producer: a Simple Producer that takes messages via POST and confirms successs</name>
+ <name>REST Producer</name>
+ <description>A Simple Producer that takes messages via POST and confirms successs</description>
<version>1.0-SNAPSHOT</version>
<dependencies>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build-info</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
@PostMapping(path = "{key}")
public DeferredResult<ProduceResult> send(
@PathVariable String key,
+ @RequestHeader(name = "X-id", required = false) Long correlationId,
@RequestBody String value)
{
DeferredResult<ProduceResult> result = new DeferredResult<>();
long now = System.currentTimeMillis();
log.trace(
- "{} - Queued #{} key={} latency={}ms",
+ "{} - Queued message with key={} latency={}ms",
id,
- value,
record.key(),
now - time
);
return result;
}
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
+ {
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ }
+
@PreDestroy
public void destroy() throws ExecutionException, InterruptedException
{
producer:
bootstrap-server: :9092
- client-id: peter
+ client-id: DEV
topic: test
acks: -1
batch-size: 16384
linger-ms: 0
compression-type: gzip
management:
+ endpoint:
+ shutdown:
+ enabled: true
endpoints:
web:
exposure:
include: "*"
+ info:
+ env:
+ enabled: true
+ java:
+ enabled: true
+info:
+ kafka:
+ bootstrap-server: ${producer.bootstrap-server}
+ client-id: ${producer.client-id}
+ topic: ${producer.topic}
+ acks: ${producer.acks}
+ batch-size: ${producer.batch-size}
+ linger-ms: ${producer.linger-ms}
+ compression-type: ${producer.compression-type}
logging:
level:
root: INFO
- de.juplo: DEBUG
+ de.juplo: TRACE
+server:
+ port: 8880
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.*;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "producer.topic=" + TOPIC})
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
+public class ApplicationTests
+{
+ static final String TOPIC = "FOO";
+ static final int PARTITIONS = 10;
+
+ @Autowired
+ MockMvc mockMvc;
+ @Autowired
+ Consumer consumer;
+
+
+ @BeforeEach
+ public void clear()
+ {
+ consumer.received.clear();
+ }
+
+
+ @Test
+ void testSendMessage() throws Exception
+ {
+ mockMvc
+ .perform(post("/peter").content("Hallo Welt!"))
+ .andExpect(status().isOk());
+ await("Message was send")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> consumer.received.size() == 1);
+ }
+
+
+ static class Consumer
+ {
+ final List<ConsumerRecord<String, String>> received = new LinkedList<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC)
+ public void receive(ConsumerRecord<String, String> record)
+ {
+ log.debug("Received message: {}", record);
+ received.add(record);
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
+ }
+}