X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FAdderRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FAdderRecordHandler.java;h=0000000000000000000000000000000000000000;hb=a2e8fc924e5b472d6b90c42d311514f91ea452f1;hp=ecd47bc0e8d3b0695e11c3069512aaf7ed08abaf;hpb=a2445f04436bb1087c9fbcd44d319496e91a90c8;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/AdderRecordHandler.java b/src/main/java/de/juplo/kafka/AdderRecordHandler.java deleted file mode 100644 index ecd47bc..0000000 --- a/src/main/java/de/juplo/kafka/AdderRecordHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class AdderRecordHandler implements RecordHandler -{ - private final Map state = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - String message = record.value(); - switch (message) - { - case "START": - state.get(partition).startSum(user); - break; - - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); - break; - - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } - - - public Map getState() - { - return state; - } -}