Gesehene Schlüssel sollten als long gezählt werden
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 16:00:48 +0000 (18:00 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 16:37:29 +0000 (18:37 +0200)
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index a504842..93e2856 100644 (file)
@@ -30,7 +30,7 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<Integer, Map<String, Integer>> seen()
+  public Map<Integer, Map<String, Long>> seen()
   {
     return consumer.getSeen();
   }
index 14a875b..2e79009 100644 (file)
@@ -33,7 +33,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
-  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -77,7 +77,7 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            Map<String, Integer> removed = seen.remove(tp.partition());
+            Map<String, Long> removed = seen.remove(tp.partition());
             for (String key : removed.keySet())
             {
               log.info(
@@ -123,12 +123,12 @@ public class EndlessConsumer implements Runnable
 
           Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          Map<String, Integer> byKey = seen.get(partition);
+          Map<String, Long> byKey = seen.get(partition);
 
           if (!byKey.containsKey(key))
-            byKey.put(key, 0);
+            byKey.put(key, 0l);
 
-          int seenByKey = byKey.get(key);
+          long seenByKey = byKey.get(key);
           seenByKey++;
           byKey.put(key, seenByKey);
         }
@@ -151,7 +151,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<Integer, Map<String, Integer>> getSeen()
+  public Map<Integer, Map<String, Long>> getSeen()
   {
     return seen;
   }