@Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
{
- private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+ private final Map<Integer, Map<String, Long>> state = new HashMap<>();
@Override
- public void accept(ConsumerRecord<String, String> record)
+ public void accept(ConsumerRecord<String, Long> record)
{
Integer partition = record.partition();
- String user = record.key();
- String message = record.value();
- switch (message)
- {
- case "START":
- state.get(partition).startSum(user);
- break;
-
- case "END":
- Long result = state.get(partition).endSum(user);
- log.info("New result for {}: {}", user, result);
- break;
-
- default:
- state.get(partition).addToSum(user, Integer.parseInt(message));
- break;
- }
+ String key = record.key() == null ? "NULL" : record.key().toString();
+ Map<String, Long> byKey = state.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
}
protected void addPartition(Integer partition, Map<String, Long> state)
{
- this.state.put(partition, new AdderBusinessLogic(state));
+ this.state.put(partition, state);
}
protected Map<String, Long> removePartition(Integer partition)
{
- return this.state.remove(partition).getState();
+ return this.state.remove(partition);
}
- public Map<Integer, AdderBusinessLogic> getState()
+ public Map<Integer, Map<String, Long>> getState()
{
return state;
}