From: Kai Moritz Date: Mon, 15 Aug 2022 17:54:49 +0000 (+0200) Subject: GRÜN: Implementierung der Erwartungen inkl. Anpassungen an der Anwendung X-Git-Tag: sumup-adder---lvm-2-tage~14 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7c0368363c3e5dbb7eb2a08f343187a93f050617;p=demos%2Fkafka%2Ftraining GRÜN: Implementierung der Erwartungen inkl. Anpassungen an der Anwendung * Neue Erwartungen an `AdderBusinessLogic` implementiert. * Die Implementierung hat sich über die nicht von den Unit-Tests abgedeckte Methode auch auf andere Teile der Anwendung ausgewirkt. * `AdderBusinessLogic.getState()` liefert jetzt in der Map die neue Klasse `AdderResult` und benötigt diese auch als Konstruktor-Argument. * Über die Integration-Tests ist sichergestellt, dass die Datenhaltung trotz der Umstellung von `Long` auf `AdderResult` funktioniert. --- diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java index 64fdb8c..d525182 100644 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java @@ -8,7 +8,7 @@ import java.util.Optional; public class AdderBusinessLogic { - private final Map state; + private final Map state; public AdderBusinessLogic() @@ -16,7 +16,7 @@ public class AdderBusinessLogic this(new HashMap<>()); } - public AdderBusinessLogic(Map state) + public AdderBusinessLogic(Map state) { this.state = state; } @@ -24,7 +24,7 @@ public class AdderBusinessLogic public synchronized Optional getSum(String user) { - return Optional.ofNullable(state.get(user)); + return Optional.ofNullable(state.get(user)).map(result -> result.sum); } public synchronized void addToSum(String user, Integer value) @@ -35,8 +35,9 @@ public class AdderBusinessLogic long sum = Optional .ofNullable(state.get(user)) + .map(result -> result.sum) .orElse(0l); - state.put(user, sum + value); + state.put(user, new AdderResult(value, sum + value)); } public synchronized AdderResult calculate(String user) @@ -44,10 +45,10 @@ public class AdderBusinessLogic if (!state.containsKey(user)) throw new IllegalStateException("No sumation for " + user + " in progress"); - return new AdderResult(66, state.remove(user)); + return state.remove(user); } - protected Map getState() + protected Map getState() { return state; } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 542af2d..5a01393 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -62,7 +62,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe offset); if (commitsEnabled) { - Map removed = recordHandler.removePartition(partition); + Map removed = recordHandler.removePartition(partition); stateRepository.save(new StateDocument(partition, removed, offset)); } else diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 828dbc2..93e1297 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -30,12 +30,12 @@ public class ApplicationRecordHandler implements RecordHandler state.get(partition).addToSum(user, Integer.parseInt(message)); } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state) { this.state.put(partition, new AdderBusinessLogic(state)); } - protected Map removePartition(Integer partition) + protected Map removePartition(Integer partition) { return this.state.remove(partition).getState(); } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index d389271..63f015d 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -33,7 +33,7 @@ public class DriverController @GetMapping("state") - public Map> state() + public Map> state() { return recordHandler diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 0540e3f..82306d0 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,7 +15,7 @@ public class StateDocument @Id public String id; public long offset = -1l; - public Map state; + public Map state; public StateDocument() { @@ -29,7 +29,7 @@ public class StateDocument public StateDocument( Integer partition, - Map state, + Map state, long offset) { this.id = Integer.toString(partition);