WIP
authorKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 15:48:24 +0000 (17:48 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 15:48:24 +0000 (17:48 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/StateDocument.java
src/main/java/de/juplo/kafka/SumRebalanceListener.java
src/main/java/de/juplo/kafka/SumRecordHandler.java

index d166ac3..9298a99 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/sumup-consumer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -16,7 +16,7 @@ if [[
   "$1" = "build"
 ]]
 then
-  mvn install || exit
+  mvn install -D skipTests || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -25,18 +25,22 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d
+docker-compose up -d producer consumer-1
 
 while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
 
-sleep 10
+echo -n START | http -v :8080/peter
+echo -n 1 | http -v :8080/peter
+echo -n 2 | http -v :8080/peter
+echo -n 3 | http -v :8080/peter
+echo -n 4 | http -v :8080/peter
+echo -n 5 | http -v :8080/peter
+echo -n 6 | http -v :8080/peter
+http -v :8081/state
+http -v :8081/state/peter
+echo -n END | http -v :8080/peter
+http -v :8081/state
+http -v :8081/state/peter
+docker-compose logs consumer-1
+docker-compose stop consumer-1
 
-docker-compose stop bart nerd riddler kraut poet linux
-
-http -v :8081/seen
-http -v :8081/seen/bart
-http -v :8082/seen
-http -v :8082/seen/bart
-
-docker-compose stop consumer-1 consumer-2
index d855918..dfd4084 100644 (file)
@@ -55,76 +55,6 @@ services:
     image: juplo/toolbox
     command: sleep infinity
 
-  bart:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune chalkboard
-          | head -1
-          | http -v producer:8080/bart;
-        echo;
-        sleep 1;
-      done"
-
-  nerd:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune computers
-          | grep  -v '^[[:space:]]*--'
-          | http -v producer:8080/nerd;
-        echo;
-        sleep 1;
-      done"
-
-  riddler:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune riddles
-          | awk -F':' '/^Q/ { print $$2 }'
-          | http -v producer:8080/riddler;
-        echo;
-        sleep 1;
-        sleep 1;
-      done"
-
-  kraut:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune de
-          | http -v producer:8080/kraut;
-        echo;
-        sleep 1;
-      done"
-
-  poet:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune songs-poems
-          | http -v producer:8080/poet;
-        echo;
-        sleep 1;
-      done"
-
-  linux:
-    image: juplo/wordcount--fortune:1.0.0
-    command: bash -c "
-      while [ true ];
-      do
-        /usr/games/fortune linux
-          | http -v producer:8080/linux;
-        echo;
-        sleep 1;
-      done"
-
   producer:
     image: juplo/rest-producer:1.0-SNAPSHOT
     ports:
@@ -136,7 +66,7 @@ services:
       producer.topic: test
 
   consumer-1:
-    image: juplo/wordcount:1.0-SNAPSHOT
+    image: juplo/sumup-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
@@ -148,7 +78,7 @@ services:
       spring.data.mongodb.database: juplo
 
   consumer-2:
-    image: juplo/wordcount:1.0-SNAPSHOT
+    image: juplo/sumup-consumer:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
diff --git a/pom.xml b/pom.xml
index dd282c5..e309ba9 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>sum</artifactId>
+  <artifactId>sumup-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Sum</name>
+  <name>Summing Up Consumer</name>
   <description>Calculates the sum of all natuarl numbers up to the given natural number</description>
 
   <dependencies>
index fdae76f..3aa9314 100644 (file)
@@ -7,7 +7,9 @@ import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 
 @RestController
@@ -31,20 +33,27 @@ public class DriverController
   }
 
 
-  @GetMapping("seen")
-  public Map<Integer, Map<String, List<Long>>> seen()
+  @GetMapping("state")
+  public Map<Integer, Map<String, Long>> state()
   {
-    return sumRecordHandler.getSeen();
+    return
+        sumRecordHandler
+            .getState()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey(),
+                entry -> entry.getValue().getState()));
   }
 
-  @GetMapping("seen/{user}")
-  public ResponseEntity<List<Long>> seen(@PathVariable String user)
+  @GetMapping("state/{user}")
+  public ResponseEntity<Long> seen(@PathVariable String user)
   {
-    for (Map<String, List<Long>> users : sumRecordHandler.getSeen().values())
+    for (SumBusinessLogic sumBusinessLogic : sumRecordHandler.getState().values())
     {
-      List<Long> results = users.get(user);
-      if (results != null)
-        return ResponseEntity.ok(results);
+      Optional<Long> sum = sumBusinessLogic.getSum(user);
+      if (sum.isPresent())
+        return ResponseEntity.ok(sum.get());
     }
 
     return ResponseEntity.notFound().build();
index 52968cd..2583c8e 100644 (file)
@@ -17,7 +17,6 @@ public class StateDocument
   public String id;
   public long offset = -1l;
   public Map<String, Long> state;
-  public Map<String, List<Long>> seen;
 
   public StateDocument()
   {
@@ -27,16 +26,15 @@ public class StateDocument
   {
     this.id = Integer.toString(partition);
     this.state = new HashMap<>();
-    this.seen = new HashMap<>();
   }
 
   public StateDocument(
       Integer partition,
       Map<String, Long> state,
-      Map<String, List<Long>> seen)
+      long offset)
   {
     this.id = Integer.toString(partition);
     this.state = state;
-    this.seen = seen;
+    this.offset = offset;
   }
 }
index be752ae..83cb759 100644 (file)
@@ -43,7 +43,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
         // Otherwise: Use initial offset, generated by Kafka
         consumer.seek(tp, document.offset);
       }
-      handler.addPartition(partition, document);
+      handler.addPartition(partition, document.state);
     });
   }
 
@@ -59,7 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
           id,
           partition,
           newOffset);
-      repository.save(handler.removePartition(partition));
+      repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
     });
   }
 
@@ -70,10 +70,10 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      handler.getSeen().forEach((partiton, statistics) -> repository.save(
+      handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
           new StateDocument(
               partiton,
-              statistics,
+              sumBusinessLogic.getState(),
               consumer.position(new TopicPartition(topic, partiton)))));
       lastCommit = clock.instant();
     }
index d4ec38f..b0fd27b 100644 (file)
@@ -6,11 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 
 @Slf4j
 public class SumRecordHandler implements RecordHandler<String, String>
 {
-  private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
   private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
 
 
@@ -24,36 +25,32 @@ public class SumRecordHandler implements RecordHandler<String, String>
     {
       case "START":
         state.get(partition).startSum(user);
-        return;
+        break;
 
       case "END":
         Long result = state.get(partition).endSum(user);
         log.info("New result for {}: {}", user, result);
-        return;
+        break;
 
       default:
         state.get(partition).addToSum(user, Integer.parseInt(message));
-        return;
+        break;
     }
   }
 
-  protected void addPartition(Integer partition, StateDocument document)
+  protected void addPartition(Integer partition, Map<String, Long> state)
   {
-    this.seen.put(partition, document.seen);
-    this.state.put(partition, new SumBusinessLogic(document.state));
+    this.state.put(partition, new SumBusinessLogic(state));
   }
 
-  protected StateDocument removePartition(Integer partition)
+  protected Map<String, Long> removePartition(Integer partition)
   {
-    return new StateDocument(
-        partition,
-        this.state.remove(partition).getState(),
-        this.seen.remove(partition));
+    return this.state.remove(partition).getState();
   }
 
 
-  public Map<Integer, Map<String, List<Long>>> getSeen()
+  public Map<Integer, SumBusinessLogic> getState()
   {
-    return seen;
+    return state;
   }
 }