<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>top10</artifactId>
- <version>1.2.1</version>
- <name>Wordcount-Top-10</name>
- <description>Top-10 stream-processor of the multi-user wordcount-example</description>
+ <artifactId>stats</artifactId>
+ <version>1.0.0</version>
+ <name>Wordcount-Statistics</name>
+ <description>Statistics stream-processor of the multi-word wordcount-example</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
</properties>
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AccessLevel;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Entry
{
- private String word;
+ private String key;
private Long counter;
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.*;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.*;
for (int j = i+1; j < list.size(); j++)
{
entry = list.get(j);
- if(entry.getWord().equals(newEntry.getWord()))
+ if(entry.getKey().equals(newEntry.getKey()))
{
list.remove(j);
break;
return this;
}
- if (entry.getWord().equals(newEntry.getWord()))
+ if (entry.getKey().equals(newEntry.getKey()))
oldPosition = i;
}
{
Entry entry = this.entries[i];
- if (seenWords.contains(entry.getWord()))
- throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+ if (seenWords.contains(entry.getKey()))
+ throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
if (entry.getCounter() > lowesCounting)
throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
- seenWords.add(entry.getWord());
+ seenWords.add(entry.getKey());
lowesCounting = entry.getCounter();
}
Set<String> otherWordsWithCurrentCount = new HashSet<>();
Entry myEntry = entries[i];
long currentCount = myEntry.getCounter();
- myWordsWithCurrentCount.add(myEntry.getWord());
+ myWordsWithCurrentCount.add(myEntry.getKey());
while (true)
{
Entry otherEntry = other.entries[i];
if (otherEntry.getCounter() != currentCount)
return false;
- otherWordsWithCurrentCount.add(otherEntry.getWord());
+ otherWordsWithCurrentCount.add(otherEntry.getKey());
if (++i >= entries.length)
return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
myEntry = entries[i];
myWordsWithCurrentCount.clear();
otherWordsWithCurrentCount.clear();
}
- myWordsWithCurrentCount.add(myEntry.getWord());
+ myWordsWithCurrentCount.add(myEntry.getKey());
}
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class Top10Application
+public class StatsApplication
{
public static void main(String[] args)
{
- SpringApplication.run(Top10Application.class, args);
+ SpringApplication.run(StatsApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@EnableConfigurationProperties(StatsApplicationProperties.class)
@Slf4j
-public class Top10ApplicationConfiguration
+public class StatsApplicationConfiguration
{
@Bean
- public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+ public Properties streamProcessorProperties(StatsApplicationProperties properties)
{
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, WindowedKey.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"word:" + Key.class.getName() + "," +
"counter:" + Entry.class.getName() + "," +
- "user:" + User.class.getName() + "," +
+ "user:" + WindowedKey.class.getName() + "," +
"ranking:" + Ranking.class.getName());
return props;
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public Top10StreamProcessor streamProcessor(
- Top10ApplicationProperties applicationProperties,
+ public StatsStreamProcessor streamProcessor(
+ StatsApplicationProperties applicationProperties,
Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
- Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ StatsStreamProcessor streamProcessor = new StatsStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("juplo.wordcount.top10")
+@ConfigurationProperties("juplo.wordcount.stats")
@Getter
@Setter
@ToString
-public class Top10ApplicationProperties
+public class StatsApplicationProperties
{
private String bootstrapServer = "localhost:9092";
- private String applicationId = "top10";
- private String inputTopic = "countings";
- private String outputTopic = "top10";
+ private String applicationId = "stats";
+ private String inputTopic = "stats";
+ private String outputTopic = "results";
private Integer commitInterval;
private Integer cacheMaxBytes;
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
@Slf4j
-public class Top10StreamProcessor
+public class StatsStreamProcessor
{
- public static final String STORE_NAME= "top10";
+ public static final String STORE_NAME= "stats";
public final KafkaStreams streams;
- public Top10StreamProcessor(
+ public StatsStreamProcessor(
String inputTopic,
String outputTopic,
Properties props,
KeyValueBytesStoreSupplier storeSupplier)
{
- Topology topology = Top10StreamProcessor.buildTopology(
+ Topology topology = StatsStreamProcessor.buildTopology(
inputTopic,
outputTopic,
storeSupplier);
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+ .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
- (user, entry, ranking) -> ranking.add(entry),
+ (windowedKey, entry, ranking) -> ranking.add(entry),
Materialized.as(storeSupplier))
.toStream()
.to(outputTopic);
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore()
+ ReadOnlyKeyValueStore<WindowedKey, Ranking> getStore()
{
return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@Data
-public class User
+public class WindowedKey
{
String user;
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
-public class TestCounter
+public class InputCounter
{
String user;
- String word;
+ String key;
long counter;
- public static TestCounter of(TestWord word, long counter)
+ public static InputCounter of(InputWindowedKey word, long counter)
{
- return new TestCounter(word.getUser(), word.getWord(), counter);
+ return new InputCounter(word.getUser(), word.getKey(), counter);
}
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
@NoArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
-public class TestWord
+public class InputWindowedKey
{
private String user;
- private String word;
+ private String key;
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
public class TestEntry
{
- String word;
+ String key;
long counter;
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
-import de.juplo.kafka.wordcount.top10.Entry;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
import lombok.AllArgsConstructor;
import lombok.Data;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Stream.of(highestEntry),
VALID_RANKINGS[0]
.stream()
- .filter(entry -> !entry.getWord().equals(word)))
+ .filter(entry -> !entry.getKey().equals(word)))
.toList();
assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
}
Ranking ranking = Ranking.of(toArray(entryList));
entryList.forEach(entry ->
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+ .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1))));
}
@DisplayName("Identical rankings are considered equal")
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
properties = {
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.in.InputWindowedKey,counter:de.juplo.kafka.wordcount.in.InputCounter",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.out.TestUser,ranking:de.juplo.kafka.wordcount.out.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
"logging.level.org.apache.kafka.streams=INFO",
- "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.top10.commit-interval=100",
- "juplo.wordcount.top10.cacheMaxBytes=0",
- "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
- "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+ "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.stats.commit-interval=100",
+ "juplo.wordcount.stats.cacheMaxBytes=0",
+ "juplo.wordcount.stats.input-topic=" + StatsApplicationIT.TOPIC_IN,
+ "juplo.wordcount.stats.output-topic=" + StatsApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_IN, StatsApplicationIT.TOPIC_OUT })
@Slf4j
-public class Top10ApplicationIT
+public class StatsApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
@Autowired
Consumer consumer;
@Autowired
- Top10StreamProcessor streamProcessor;
+ StatsStreamProcessor streamProcessor;
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+ @Autowired KafkaTemplate<InputWindowedKey, InputCounter> kafkaTemplate)
{
TestData
.getInputMessages()
{
try
{
- SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<InputWindowedKey, InputCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import java.util.Map;
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig;
@Slf4j
-public class Top10StreamProcessorTopologyTest
+public class StatsStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
TopologyTestDriver testDriver;
- TestInputTopic<TestWord, TestCounter> in;
+ TestInputTopic<InputWindowedKey, InputCounter> in;
TestOutputTopic<TestUser, TestRanking> out;
@BeforeEach
public void setUp()
{
- Topology topology = Top10StreamProcessor.buildTopology(
+ Topology topology = StatsStreamProcessor.buildTopology(
IN,
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
in = testDriver.createInputTopic(
IN,
- jsonSerializer(TestWord.class, true),
- jsonSerializer(TestCounter.class,false));
+ jsonSerializer(InputWindowedKey.class, true),
+ jsonSerializer(InputCounter.class,false));
out = testDriver.createOutputTopic(
OUT,
TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
TestData.assertExpectedLastMessagesForUsers(receivedMessages);
- KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<WindowedKey, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
TestData.assertExpectedState(store);
}
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
- "word:" + TestWord.class.getName() + "," +
- "counter:" + TestCounter.class.getName()),
+ "word:" + InputWindowedKey.class.getName() + "," +
+ "counter:" + InputCounter.class.getName()),
isKey);
return jsonSerializer;
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestEntry;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestEntry;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
static final TestUser PETER = TestUser.of("peter");
static final TestUser KLAUS = TestUser.of("klaus");
- static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+ static final Stream<KeyValue<InputWindowedKey, InputCounter>> getInputMessages()
{
return Stream.of(INPUT_MESSAGES);
}
- private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<InputWindowedKey, InputCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Hallo"),
- TestCounter.of(PETER.getUser(),"Hallo",1)),
+ InputWindowedKey.of(PETER.getUser(),"Hallo"),
+ InputCounter.of(PETER.getUser(),"Hallo",1)),
new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+ InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+ InputCounter.of(KLAUS.getUser(),"Müsch",1)),
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",1)),
+ InputWindowedKey.of(PETER.getUser(),"Welt"),
+ InputCounter.of(PETER.getUser(),"Welt",1)),
new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+ InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+ InputCounter.of(KLAUS.getUser(),"Müsch",2)),
new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",1)),
+ InputWindowedKey.of(KLAUS.getUser(),"s"),
+ InputCounter.of(KLAUS.getUser(),"s",1)),
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",1)),
+ InputWindowedKey.of(PETER.getUser(),"Boäh"),
+ InputCounter.of(PETER.getUser(),"Boäh",1)),
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",2)),
+ InputWindowedKey.of(PETER.getUser(),"Welt"),
+ InputCounter.of(PETER.getUser(),"Welt",2)),
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",2)),
+ InputWindowedKey.of(PETER.getUser(),"Boäh"),
+ InputCounter.of(PETER.getUser(),"Boäh",2)),
new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",2)),
+ InputWindowedKey.of(KLAUS.getUser(),"s"),
+ InputCounter.of(KLAUS.getUser(),"s",2)),
new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",3)),
+ InputWindowedKey.of(PETER.getUser(),"Boäh"),
+ InputCounter.of(PETER.getUser(),"Boäh",3)),
new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",3)),
+ InputWindowedKey.of(KLAUS.getUser(),"s"),
+ InputCounter.of(KLAUS.getUser(),"s",3)),
};
static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
.containsExactlyElementsOf(rankings));
}
- static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ static void assertExpectedState(ReadOnlyKeyValueStore<WindowedKey, Ranking> store)
{
assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
}
- private static User userOf(TestUser user)
+ private static WindowedKey userOf(TestUser user)
{
- return User.of(user.getUser());
+ return WindowedKey.of(user.getUser());
}
static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
return Arrays
.stream(entries)
.map(entry -> TestEntry.of(
- entry.getWord(),
+ entry.getKey(),
entry.getCounter() == null
? -1l
: entry.getCounter()))