Gesehene Schlüssel sollten als long gezählt werden
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index c2d4447..bc3d357 100644 (file)
@@ -37,7 +37,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
 
 
-  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -81,7 +81,7 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            Map<String, Integer> removed = seen.remove(tp.partition());
+            Map<String, Long> removed = seen.remove(tp.partition());
             for (String key : removed.keySet())
             {
               log.info(
@@ -127,12 +127,12 @@ public class EndlessConsumer implements Runnable
 
           Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          Map<String, Integer> byKey = seen.get(partition);
+          Map<String, Long> byKey = seen.get(partition);
 
           if (!byKey.containsKey(key))
-            byKey.put(key, 0);
+            byKey.put(key, 0l);
 
-          int seenByKey = byKey.get(key);
+          long seenByKey = byKey.get(key);
           seenByKey++;
           byKey.put(key, seenByKey);
         }
@@ -176,7 +176,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<Integer, Map<String, Integer>> getSeen()
+  public Map<Integer, Map<String, Long>> getSeen()
   {
     return seen;
   }