top10: 1.1.2 - (GREEN) Fixed implementation of `Ranking`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / top10 / Ranking.java
1 package de.juplo.kafka.wordcount.top10;
2
3 import lombok.*;
4
5 import java.util.*;
6
7
8 @AllArgsConstructor(access = AccessLevel.PRIVATE)
9 @NoArgsConstructor
10 @Data
11 public class Ranking
12 {
13   public final static int MAX_ENTRIES = 10;
14
15
16   private Entry[] entries = new Entry[0];
17
18   public Ranking add(Entry newEntry)
19   {
20     if (entries.length == 0)
21     {
22       entries = new Entry[1];
23       entries[0] = newEntry;
24       return this;
25     }
26
27     List<Entry> list = new LinkedList<>(Arrays.asList(entries));
28     int oldPosition = -1;
29     for (int i = 0; i < list.size(); i++)
30     {
31       Entry entry = list.get(i);
32
33       if (entry.getCounter() < newEntry.getCounter())
34       {
35         if (oldPosition > -1)
36         {
37           if (list.get(oldPosition).getCounter() > newEntry.getCounter())
38           {
39             throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
40           }
41           else
42           {
43             // Entry for word already exists with the same counting! Nothing changed...
44             return this;
45           }
46         }
47
48         list.add(i, newEntry);
49         for (int j = i+1; j < list.size(); j++)
50         {
51           entry = list.get(j);
52           if(entry.getWord().equals(newEntry.getWord()))
53           {
54             list.remove(j);
55             break;
56           }
57         }
58         if (list.size() > MAX_ENTRIES)
59         {
60           list = list.subList(0, MAX_ENTRIES);
61         }
62         entries = list.toArray(num -> new Entry[num]);
63         return this;
64       }
65
66       if (entry.getWord().equals(newEntry.getWord()))
67         oldPosition = i;
68     }
69
70     if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
71     {
72       throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
73     }
74
75     if (list.size() < MAX_ENTRIES)
76     {
77       list.add(newEntry);
78       entries = list.toArray(num -> new Entry[num]);
79     }
80
81     return this;
82   }
83
84   public Ranking validate() throws IllegalArgumentException
85   {
86     if (this.entries.length > MAX_ENTRIES)
87       throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
88
89     Set<String> seenWords = new HashSet<>();
90     long lowesCounting = Long.MAX_VALUE;
91
92     for (int i=0; i<this.entries.length; i++)
93     {
94       Entry entry = this.entries[i];
95
96       if (seenWords.contains(entry.getWord()))
97         throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
98       if (entry.getCounter() > lowesCounting)
99         throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
100
101       seenWords.add(entry.getWord());
102       lowesCounting = entry.getCounter();
103     }
104
105     return this;
106   }
107
108   public static Ranking of(Entry... entries)
109   {
110     Ranking ranking = new Ranking(entries);
111     ranking.validate();
112     return ranking;
113   }
114 }