projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
top10: 1.4.0 - Refined output JSON -- MOVE
[demos/kafka/wordcount]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
top10
/
Top10StreamProcessor.java
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
index
2ff078c
..
907c7ff
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
@@
-1,11
+1,10
@@
package de.juplo.kafka.wordcount.top10;
import lombok.extern.slf4j.Slf4j;
package de.juplo.kafka.wordcount.top10;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
@@
-14,18
+13,21
@@
import java.util.Properties;
@Slf4j
public class Top10StreamProcessor
{
@Slf4j
public class Top10StreamProcessor
{
+ public static final String STORE_NAME= "top10";
+
public final KafkaStreams streams;
public Top10StreamProcessor(
String inputTopic,
String outputTopic,
public final KafkaStreams streams;
public Top10StreamProcessor(
String inputTopic,
String outputTopic,
- Properties props)
+ Properties props,
+ KeyValueBytesStoreSupplier storeSupplier)
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
outputTopic,
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
outputTopic,
-
null
);
+
storeSupplier
);
streams = new KafkaStreams(topology, props);
}
streams = new KafkaStreams(topology, props);
}
@@
-39,11
+41,12
@@
public class Top10StreamProcessor
builder
.<Key, Entry>stream(inputTopic)
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.get
User
()), entry))
+ .map((key, entry) -> new KeyValue<>(User.of(key.get
Channel
()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
.groupByKey()
.aggregate(
() -> new Ranking(),
- (user, entry, ranking) -> ranking.add(entry))
+ (user, entry, ranking) -> ranking.add(entry),
+ Materialized.as(storeSupplier))
.toStream()
.to(outputTopic);
.toStream()
.to(outputTopic);
@@
-53,9
+56,9
@@
public class Top10StreamProcessor
return topology;
}
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore(
String name
)
+ ReadOnlyKeyValueStore<User, Ranking> getStore()
{
{
- return
null
;
+ return
streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()))
;
}
public void start()
}
public void start()