Gesehene Schlüssel sollten als long gezählt werden
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 16:00:48 +0000 (18:00 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:16:15 +0000 (22:16 +0200)
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 1525f5a..93580ee 100644 (file)
@@ -32,7 +32,7 @@ public class DriverController
   }
 
   @GetMapping("seen")
-  public Map<Integer, Map<String, Integer>> seen()
+  public Map<Integer, Map<String, Long>> seen()
   {
     return consumer.getSeen();
   }
index c2d4447..bc3d357 100644 (file)
@@ -37,7 +37,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
 
 
-  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -81,7 +81,7 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            Map<String, Integer> removed = seen.remove(tp.partition());
+            Map<String, Long> removed = seen.remove(tp.partition());
             for (String key : removed.keySet())
             {
               log.info(
@@ -127,12 +127,12 @@ public class EndlessConsumer implements Runnable
 
           Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          Map<String, Integer> byKey = seen.get(partition);
+          Map<String, Long> byKey = seen.get(partition);
 
           if (!byKey.containsKey(key))
-            byKey.put(key, 0);
+            byKey.put(key, 0l);
 
-          int seenByKey = byKey.get(key);
+          long seenByKey = byKey.get(key);
           seenByKey++;
           byKey.put(key, seenByKey);
         }
@@ -176,7 +176,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<Integer, Map<String, Integer>> getSeen()
+  public Map<Integer, Map<String, Long>> getSeen()
   {
     return seen;
   }