import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.*;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.net.URI;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Optional;
import java.util.Properties;
}
static Topology buildTopology(
- String usersInputTopic,
- String rankingInputTopic,
- KeyValueBytesStoreSupplier userStoreSupplier,
- KeyValueBytesStoreSupplier rankingStoreSupplier)
+ String inputTopic,
+ WindowBytesStoreSupplier windowBytesStoreSupplier,
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
- KTable<String, User> users = builder
- .stream(
- usersInputTopic,
- Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
- .toTable(
- Materialized
- .<String, User>as(userStoreSupplier)
- .withKeySerde(Serdes.String())
- .withValueSerde(new JsonSerde().copyWithType(User.class)));
- KStream<String, Ranking> rankings = builder
- .<Word, Ranking>stream(rankingInputTopic)
- .map((word, value) -> new KeyValue<>(word.getUser(), value));
-
- rankings
- .join(users, (ranking, user) -> UserRanking.of(
- user.getFirstName(),
- user.getLastName(),
- ranking.getEntries()),
- Joined.keySerde(Serdes.String()))
- .toTable(
- Materialized
- .<String, UserRanking>as(rankingStoreSupplier)
- .withKeySerde(Serdes.String())
- .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+ builder
+ .<Word, WordCounter>stream(inputTopic)
+ .map((word, counter) -> new KeyValue<>(word.getUser(), counter))
+ .groupByKey()
+ .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)))
+ .emitStrategy(EmitStrategy.onWindowClose())
+ .aggregate(
+ () -> new Ranking(),
+ (word, counter, ranking) -> ranking,
+ Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
+ .toStream()
+ .map((windowedWord, ranking) ->
+ {
+ Instant endTime = windowedWord.window().endTime();
+ return new KeyValue<>(windowedWord.key(), ranking);
+ })
+ .toTable(Materialized.as(keyValueBytesStoreSupplier));
Topology topology = builder.build();
log.info("\n\n{}", topology.describe());