projects
/
demos
/
kafka
/
training
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
813117d
)
Code an die Version aus 'sumup-adder--springified' angepasst
sumup-adder--json
author
Kai Moritz
<kai@juplo.de>
Fri, 16 Sep 2022 09:27:42 +0000
(11:27 +0200)
committer
Kai Moritz
<kai@juplo.de>
Fri, 16 Sep 2022 09:27:56 +0000
(11:27 +0200)
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
index
829ab0e
..
2829157
100644
(file)
--- a/
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
+++ b/
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
@@
-21,6
+21,24
@@
public class ApplicationRecordHandler implements RecordHandler<String, Message>
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+ public void addNumber(
+ Integer partition,
+ String user,
+ MessageAddNumber message)
+ {
+ state.get(partition).addToSum(user, message.getNext());
+ }
+
+ public void calculateSum(
+ Integer partition,
+ String user,
+ MessageCalculateSum message)
+ {
+ AdderResult result = state.get(partition).calculate(user);
+ log.info("{} - New result for {}: {}", id, user, result);
+ results.addResults(partition, user, result);
+ }
+
@Override
public void accept(ConsumerRecord<String, Message> record)
{
@Override
public void accept(ConsumerRecord<String, Message> record)
{
@@
-31,14
+49,11
@@
public class ApplicationRecordHandler implements RecordHandler<String, Message>
switch(message.getType())
{
case ADD:
switch(message.getType())
{
case ADD:
- MessageAddNumber addNumber = (MessageAddNumber)message;
- state.get(partition).addToSum(user, addNumber.getNext());
+ addNumber(partition, user, (MessageAddNumber) message);
break;
case CALC:
break;
case CALC:
- AdderResult result = state.get(partition).calculate(user);
- log.info("{} - New result for {}: {}", id, user, result);
- results.addResults(partition, user, result);
+ calculateSum(partition, user, (MessageCalculateSum) message);
break;
}
break;
}