while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
-# tag::nachrichten[]
echo 6 | http -v :8080/peter
echo 77 | http -v :8080/klaus
-# end::nachrichten[]
echo "Writing poison pill..."
-# tag::poisonpill[]
echo 'BOOM!' | kafkacat -P -b :9092 -t test
-# end::poisonpill[]
docker-compose logs -f consumer-1 consumer-2
echo "Restarting consumer-1..."
-# tag::restart[]
docker-compose up consumer-1
-# end::restart[]
producer.bootstrap-server: kafka:9092
producer.client-id: producer
producer.topic: test
-
- consumer-1:
- image: juplo/spring-consumer-json:1.0-SNAPSHOT
- ports:
- - 8081:8080
- environment:
- server.port: 8080
- spring.kafka.bootstrap-servers: kafka:9092
- spring.kafka.client-id: consumer-1
- spring.kafka.consumer.group-id: my-group
- simple.consumer.topic: test
-
- consumer-2:
- image: juplo/spring-consumer-json:1.0-SNAPSHOT
- ports:
- - 8082:8080
- environment:
- server.port: 8080
- spring.kafka.bootstrap-servers: kafka:9092
- spring.kafka.client-id: consumer-2
- spring.kafka.consumer.group-id: my-group
- simple.consumer.topic: test
<build>
<plugins>
- <plugin>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- </plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
{
@Bean
public SimpleConsumer simpleConsumer(
- Consumer<String, Message> kafkaConsumer,
+ Consumer<String, String> kafkaConsumer,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
{
private final String id;
private final String topic;
- private final Consumer<String, Message> consumer;
+ private final Consumer<String, String> consumer;
private long consumed = 0;
while (true)
{
- ConsumerRecords<String, Message> records =
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Message> record : records)
+ for (ConsumerRecord<String, String> record : records)
{
handleRecord(
record.topic(),
auto-offset-reset: earliest
auto-commit-interval: 5s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
metadata.max.age.ms: 1000
- spring.json.type.mapping: >
- ADD:de.juplo.kafka.MessageAddNumber,
- CALC:de.juplo.kafka.MessageCalculateSum
+ spring.json.type.mapping: TODO
logging:
level:
root: INFO