X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=596f3da256d9d1e9a052ea43fe2d21c674db94cd;hb=a4be0eac8d9f234cd53ae1917a084f65d91d0460;hp=828dbc239cabb071153e999a82b0f31749fd6dae;hpb=4a8642d0c36413cbc283d35dda8977b0e9320372;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 828dbc2..596f3da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -7,9 +8,12 @@ import java.util.HashMap; import java.util.Map; +@RequiredArgsConstructor @Slf4j public class ApplicationRecordHandler implements RecordHandler { + private final AdderResults results; + private final Map state = new HashMap<>(); @@ -24,18 +28,19 @@ public class ApplicationRecordHandler implements RecordHandler { AdderResult result = state.get(partition).calculate(user); log.info("New result for {}: {}", user, result); + results.addResults(partition, user, result); return; } state.get(partition).addToSum(user, Integer.parseInt(message)); } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state) { this.state.put(partition, new AdderBusinessLogic(state)); } - protected Map removePartition(Integer partition) + protected Map removePartition(Integer partition) { return this.state.remove(partition).getState(); } @@ -45,4 +50,9 @@ public class ApplicationRecordHandler implements RecordHandler { return state; } + + public AdderBusinessLogic getState(Integer partition) + { + return state.get(partition); + } }