summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
066ac7e)
* Adapted the configuration to the changed type-mapping for the key.
* Refined the class `Key`, that defines the JSON for the input key.
** Added attribute `type` with value of type `enum StatsType`.
** Renamed attribute `user` to `channel`.
** Renamed attribute `word` to `key`.
* Refined the class `Entry`, that defines the JSON for the input value.
** Renamed attribute `word` to `key`.
* Adapted test-classes and -cases accordingly.
14 files changed:
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
- <version>1.2.1</version>
+ <version>1.3.0</version>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
@JsonIgnoreProperties(ignoreUnknown = true)
public class Entry
{
@JsonIgnoreProperties(ignoreUnknown = true)
public class Entry
{
@JsonIgnoreProperties(ignoreUnknown = true)
public class Key
{
@JsonIgnoreProperties(ignoreUnknown = true)
public class Key
{
- private String user;
- private String word;
+ private StatsType type;
+ private String channel;
+ private String key;
for (int j = i+1; j < list.size(); j++)
{
entry = list.get(j);
for (int j = i+1; j < list.size(); j++)
{
entry = list.get(j);
- if(entry.getWord().equals(newEntry.getWord()))
+ if(entry.getKey().equals(newEntry.getKey()))
- if (entry.getWord().equals(newEntry.getWord()))
+ if (entry.getKey().equals(newEntry.getKey()))
{
Entry entry = this.entries[i];
{
Entry entry = this.entries[i];
- if (seenWords.contains(entry.getWord()))
- throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+ if (seenWords.contains(entry.getKey()))
+ throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
if (entry.getCounter() > lowesCounting)
throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
if (entry.getCounter() > lowesCounting)
throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
- seenWords.add(entry.getWord());
+ seenWords.add(entry.getKey());
lowesCounting = entry.getCounter();
}
lowesCounting = entry.getCounter();
}
Set<String> otherWordsWithCurrentCount = new HashSet<>();
Entry myEntry = entries[i];
long currentCount = myEntry.getCounter();
Set<String> otherWordsWithCurrentCount = new HashSet<>();
Entry myEntry = entries[i];
long currentCount = myEntry.getCounter();
- myWordsWithCurrentCount.add(myEntry.getWord());
+ myWordsWithCurrentCount.add(myEntry.getKey());
while (true)
{
Entry otherEntry = other.entries[i];
if (otherEntry.getCounter() != currentCount)
return false;
while (true)
{
Entry otherEntry = other.entries[i];
if (otherEntry.getCounter() != currentCount)
return false;
- otherWordsWithCurrentCount.add(otherEntry.getWord());
+ otherWordsWithCurrentCount.add(otherEntry.getKey());
if (++i >= entries.length)
return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
myEntry = entries[i];
if (++i >= entries.length)
return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
myEntry = entries[i];
myWordsWithCurrentCount.clear();
otherWordsWithCurrentCount.clear();
}
myWordsWithCurrentCount.clear();
otherWordsWithCurrentCount.clear();
}
- myWordsWithCurrentCount.add(myEntry.getWord());
+ myWordsWithCurrentCount.add(myEntry.getKey());
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+enum StatsType
+{
+ COUNTER,
+ POPULAR
+}
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
- "word:" + Key.class.getName() + "," +
+ "key:" + Key.class.getName() + "," +
"counter:" + Entry.class.getName() + "," +
"user:" + User.class.getName() + "," +
"ranking:" + Ranking.class.getName());
"counter:" + Entry.class.getName() + "," +
"user:" + User.class.getName() + "," +
"ranking:" + Ranking.class.getName());
builder
.<Key, Entry>stream(inputTopic)
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+ .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
.groupByKey()
.aggregate(
() -> new Ranking(),
@AllArgsConstructor(staticName = "of")
public class TestCounter
{
@AllArgsConstructor(staticName = "of")
public class TestCounter
{
- String user;
- String word;
-
- public static TestCounter of(TestWord word, long counter)
- {
- return new TestCounter(word.getUser(), word.getWord(), counter);
- }
@JsonIgnoreProperties(ignoreUnknown = true)
public class TestWord
{
@JsonIgnoreProperties(ignoreUnknown = true)
public class TestWord
{
- private String user;
- private String word;
+ private String type;
+ private String channel;
+ private String key;
@Data
public class TestEntry
{
@Data
public class TestEntry
{
Stream.of(highestEntry),
VALID_RANKINGS[0]
.stream()
Stream.of(highestEntry),
VALID_RANKINGS[0]
.stream()
- .filter(entry -> !entry.getWord().equals(word)))
+ .filter(entry -> !entry.getKey().equals(word)))
.toList();
assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
}
.toList();
assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
}
Ranking ranking = Ranking.of(toArray(entryList));
entryList.forEach(entry ->
assertThatExceptionOfType(IllegalArgumentException.class)
Ranking ranking = Ranking.of(toArray(entryList));
entryList.forEach(entry ->
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+ .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1))));
}
@DisplayName("Identical rankings are considered equal")
}
@DisplayName("Identical rankings are considered equal")
+ static final String TYPE_COUNTER = "COUNTER";
+
static final TestUser PETER = TestUser.of("peter");
static final TestUser KLAUS = TestUser.of("klaus");
static final TestUser PETER = TestUser.of("peter");
static final TestUser KLAUS = TestUser.of("klaus");
private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Hallo"),
- TestCounter.of(PETER.getUser(),"Hallo",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"),
+ TestCounter.of("Hallo",1)),
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+ TestCounter.of("Müsch",1)),
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+ TestCounter.of("Welt",1)),
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+ TestCounter.of("Müsch",2)),
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",1)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestCounter.of("s",1)),
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestCounter.of("Boäh",1)),
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",2)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+ TestCounter.of("Welt",2)),
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",2)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestCounter.of("Boäh",2)),
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",2)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestCounter.of("s",2)),
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",3)),
+ TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestCounter.of("Boäh",3)),
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",3)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestCounter.of("s",3)),
};
static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
};
static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
return Arrays
.stream(entries)
.map(entry -> TestEntry.of(
return Arrays
.stream(entries)
.map(entry -> TestEntry.of(
entry.getCounter() == null
? -1l
: entry.getCounter()))
entry.getCounter() == null
? -1l
: entry.getCounter()))
properties = {
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
properties = {
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
- "word:" + TestWord.class.getName() + "," +
+ "key:" + TestWord.class.getName() + "," +
"counter:" + TestCounter.class.getName()),
isKey);
return jsonSerializer;
"counter:" + TestCounter.class.getName()),
isKey);
return jsonSerializer;