#!/bin/bash
 
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/springified-endless-consumer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
 docker-compose up -d producer consumer
-sleep 5
+until [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for the producer..."; sleep 1; done
+echo 'Hallo Welt!' | http -v :8080/peter
+echo peter | http -v :8080/
+http -v PUT :8080/peter
 docker-compose exec -T cli bash << 'EOF'
 echo "Writing poison pill into topic test..."
 # tag::poisonpill[]
 echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
 # end::poisonpill[]
 EOF
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
-http -v :8081/actuator/health
-echo "Restarting consumer"
-http -v post :8081/start
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
+until [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for the consumer..."; sleep 1; done
+sleep 5
+http -v :8081/seen
 http -v :8081/actuator/health
-http -v post :8081/actuator/shutdown
-docker-compose stop producer
+docker-compose stop producer consumer
 docker-compose ps
-docker-compose logs --tail=100 consumer
+docker-compose logs consumer
+docker-compose exec -T cli bash << 'EOF'
+echo "Messages in topic \"test\":"
+kafkacat -C -b kafka:9092 -t test -o 0 -e
+echo "Messages in topic \"dlq\":"
+kafkacat -C -b kafka:9092 -t dlq -o 0 -e
+EOF
 
     command: sleep infinity
 
   producer:
-    image: juplo/endless-producer:1.0-SNAPSHOT
+    image: juplo/springified-producer:1.0-SNAPSHOT
     ports:
       - 8080:8880
     environment:
 
 
   consumer:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+    image: juplo/springified-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8881
     environment:
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: my-group
-      consumer.client-id: consumer
+      spring.kafka.consumer.bootstrap-servers: kafka:9092
+      spring.kafka.consumer.group-id: my-group
+      spring.kafka.consumer.client-id: consumer
+      spring.kafka.producer.bootstrap-servers: kafka:9092
+      spring.kafka.producer.client-id: consumer
       consumer.topic: test
 
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-consumer</artifactId>
+  <artifactId>springified-endless-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
+  <name>Springified Endless Consumer: a Simple Consumer-Group that is implemented using Spring Kafka and reads and prints the topic and counts the received messages for each key by topic</name>
 
   <dependencies>
     <dependency>