top10: 1.3.0 - Refined input JSON to match the new general stats-format
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Ranking.java
index b748fe5..279716a 100644 (file)
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
 
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 
 
-@Getter
-@Setter
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
 public class Ranking
 {
+  public final static int MAX_ENTRIES = 10;
+
+
   private Entry[] entries = new Entry[0];
 
-  public void add(Entry newEntry)
+  public Ranking add(Entry newEntry)
   {
     if (entries.length == 0)
     {
       entries = new Entry[1];
       entries[0] = newEntry;
-      return;
+      return this;
     }
 
     List<Entry> list = new LinkedList<>(Arrays.asList(entries));
+    int oldPosition = -1;
     for (int i = 0; i < list.size(); i++)
     {
-      Entry entry;
+      Entry entry = list.get(i);
 
-      entry = list.get(i);
-      if (entry.getCount() <= newEntry.getCount())
+      if (entry.getCounter() < newEntry.getCounter())
       {
+        if (oldPosition > -1)
+        {
+          if (list.get(oldPosition).getCounter() > newEntry.getCounter())
+          {
+            throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+          }
+          else
+          {
+            // Entry for word already exists with the same counting! Nothing changed...
+            return this;
+          }
+        }
+
         list.add(i, newEntry);
         for (int j = i+1; j < list.size(); j++)
         {
           entry = list.get(j);
-          if(entry.getWord().equals(newEntry.getWord()))
+          if(entry.getKey().equals(newEntry.getKey()))
           {
             list.remove(j);
             break;
           }
         }
-        if (list.size() > 10)
+        if (list.size() > MAX_ENTRIES)
         {
-          list = list.subList(0,10);
+          list = list.subList(0, MAX_ENTRIES);
         }
         entries = list.toArray(num -> new Entry[num]);
-        return;
+        return this;
+      }
+
+      if (entry.getKey().equals(newEntry.getKey()))
+        oldPosition = i;
+    }
+
+    if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
+    {
+      throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+    }
+
+    if (list.size() < MAX_ENTRIES)
+    {
+      list.add(newEntry);
+      entries = list.toArray(num -> new Entry[num]);
+    }
+
+    return this;
+  }
+
+  public Ranking validate() throws IllegalArgumentException
+  {
+    if (this.entries.length > MAX_ENTRIES)
+      throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
+
+    Set<String> seenWords = new HashSet<>();
+    long lowesCounting = Long.MAX_VALUE;
+
+    for (int i=0; i<this.entries.length; i++)
+    {
+      Entry entry = this.entries[i];
+
+      if (seenWords.contains(entry.getKey()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
+      if (entry.getCounter() > lowesCounting)
+        throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
+
+      seenWords.add(entry.getKey());
+      lowesCounting = entry.getCounter();
+    }
+
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o)
+      return true;
+    if (o == null)
+      return false;
+    if (!(o instanceof Ranking))
+      return false;
+
+    Ranking other = (Ranking)o;
+
+    if (other.entries.length != entries.length)
+      return false;
+
+    if (entries.length == 0)
+      return true;
+
+    int i = 0;
+    Set<String> myWordsWithCurrentCount = new HashSet<>();
+    Set<String> otherWordsWithCurrentCount = new HashSet<>();
+    Entry myEntry = entries[i];
+    long currentCount = myEntry.getCounter();
+    myWordsWithCurrentCount.add(myEntry.getKey());
+    while (true)
+    {
+      Entry otherEntry = other.entries[i];
+      if (otherEntry.getCounter() != currentCount)
+        return false;
+      otherWordsWithCurrentCount.add(otherEntry.getKey());
+      if (++i >= entries.length)
+        return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
+      myEntry = entries[i];
+      if (myEntry.getCounter() != currentCount)
+      {
+        if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
+          return false;
+        currentCount = myEntry.getCounter();
+        myWordsWithCurrentCount.clear();
+        otherWordsWithCurrentCount.clear();
       }
+      myWordsWithCurrentCount.add(myEntry.getKey());
     }
   }
+
+  public static Ranking of(Entry... entries)
+  {
+    Ranking ranking = new Ranking(entries);
+    ranking.validate();
+    return ranking;
+  }
 }