* Angelehnt an die Code-Vereinheitlickung in `sumup-adder`.
* Dabei: Setup und Skript angepasst und repariert.
exit
fi
+docker-compose stop producer consumer
docker-compose up -d zookeeper kafka cli
if [[
"$1" = "build"
]]
then
- mvn install || exit
+ docker-compose rm -svf consumer
+ mvn clean install || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
docker-compose exec -T cli bash << 'EOF'
echo "Creating topic with 3 partitions..."
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
EOF
-docker-compose up -d consumer
+docker-compose up -d producer consumer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
+while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done
-docker-compose up -d producer
-sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
docker-compose stop producer
docker-compose exec -T cli bash << 'EOF'
EOF
docker-compose start producer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+
sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+http -v :8081/state
+
+docker-compose stop producer
depends_on:
- zookeeper
- kafka-ui:
- image: provectuslabs/kafka-ui:0.3.3
- ports:
- - 8080:8080
- environment:
- KAFKA_CLUSTERS_0_NAME: local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
-
cli:
image: juplo/toolbox
command: sleep infinity
public class ApplicationConfiguration
{
@Bean
- public KeyCountingRecordHandler keyCountingRecordHandler()
+ public ApplicationRecordHandler recordHandler()
{
- return new KeyCountingRecordHandler();
+ return new ApplicationRecordHandler();
}
@Bean
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- KeyCountingRecordHandler keyCountingRecordHandler,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRecordHandler);
+ recordHandler);
}
@Bean
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+ private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ Integer partition = record.partition();
+ String key = record.key() == null ? "NULL" : record.key().toString();
+
+ if (!state.containsKey(partition))
+ state.put(partition, new HashMap<>());
+
+ Map<String, Long> byKey = state.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
+ }
+
+
+ public Map<Integer, Map<String, Long>> getState()
+ {
+ return state;
+ }
+}
public class DriverController
{
private final EndlessConsumer consumer;
- private final KeyCountingRecordHandler keyCountingRecordHandler;
+ private final ApplicationRecordHandler recordHandler;
@PostMapping("start")
}
- @GetMapping("seen")
- public Map<Integer, Map<String, Long>> seen()
+ @GetMapping("state")
+ public Map<Integer, Map<String, Long>> state()
{
- return keyCountingRecordHandler.getSeen();
+ return recordHandler.getState();
}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, Long> record)
- {
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
-
- if (!seen.containsKey(partition))
- seen.put(partition, new HashMap<>());
-
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
- }
-
-
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-}
@Autowired
ExecutorService executor;
@Autowired
- KeyCountingRecordHandler keyCountingRecordHandler;
+ ApplicationRecordHandler recordHandler;
EndlessConsumer<String, Long> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
});
TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+ new TestRecordHandler<String, Long>(recordHandler) {
@Override
public void onNewRecord(ConsumerRecord<String, Long> record)
{