--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, String>
+{
+ private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ Integer partition = record.partition();
+ String user = record.key();
+ String message = record.value();
+ switch (message)
+ {
+ case "START":
+ state.get(partition).startSum(user);
+ break;
+
+ case "END":
+ Long result = state.get(partition).endSum(user);
+ log.info("New result for {}: {}", user, result);
+ break;
+
+ default:
+ state.get(partition).addToSum(user, Integer.parseInt(message));
+ break;
+ }
+ }
+
+ protected void addPartition(Integer partition, Map<String, Long> state)
+ {
+ this.state.put(partition, new AdderBusinessLogic(state));
+ }
+
+ protected Map<String, Long> removePartition(Integer partition)
+ {
+ return this.state.remove(partition).getState();
+ }
+
+
+ public Map<Integer, AdderBusinessLogic> getState()
+ {
+ return state;
+ }
+}