while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
-# tag::nachrichten[]
echo 6 | http -v :8080/peter
echo 77 | http -v :8080/klaus
-# end::nachrichten[]
-echo "Writing poison pill..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b :9092 -t test
-# end::poisonpill[]
-
-docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2
-
-echo "Restarting consumer-1..."
-# tag::restart[]
-docker-compose -f docker/docker-compose.yml up consumer-1
-# end::restart[]
+sleep 5
+docker-compose -f docker/docker-compose.yml logs consumer-1 consumer-2
{
@Bean
public SimpleConsumer simpleConsumer(
- Consumer<String, Message> kafkaConsumer,
+ Consumer<String, String> kafkaConsumer,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
{
private final String id;
private final String topic;
- private final Consumer<String, Message> consumer;
+ private final Consumer<String, String> consumer;
private long consumed = 0;
while (true)
{
- ConsumerRecords<String, Message> records =
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, Message> record : records)
+ for (ConsumerRecord<String, String> record : records)
{
handleRecord(
record.topic(),
Integer partition,
Long offset,
String key,
- Message value)
+ Object value)
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
auto-offset-reset: earliest
auto-commit-interval: 5s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
metadata.max.age.ms: 1000
- spring.json.type.mapping: >
- ADD:de.juplo.kafka.MessageAddNumber,
- CALC:de.juplo.kafka.MessageCalculateSum
+ spring.json.type.mapping: TODO
logging:
level:
root: INFO