import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
- outputTopic);
+ outputTopic,
+ null);
streams = new KafkaStreams(topology, props);
}
static Topology buildTopology(
String inputTopic,
- String outputTopic)
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
+ .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
return topology;
}
+ ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ {
+ return null;
+ }
+
public void start()
{
log.info("Starting Stream-Processor");