import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
streamProcessorProperties,
applicationServer,
- applicationProperties.getUsersInputTopic(),
- applicationProperties.getRankingInputTopic(),
+ applicationProperties.getInputTopic(),
userStoreSupplier,
rankingStoreSupplier);
}
@Bean
- public KeyValueBytesStoreSupplier userStoreSupplier()
+ public WindowBytesStoreSupplier windowedBytesStoreSupplier()
{
- return Stores.persistentKeyValueStore(USER_STORE_NAME);
+ return Stores.persistentWindowStore(
+ USER_STORE_NAME,
+ Duration.ofMinutes(1),
+ Duration.ofMinutes(1),
+ false);
}
@Bean
- public KeyValueBytesStoreSupplier rankingStoreSupplier()
+ public KeyValueBytesStoreSupplier keyValueByptesStoreSupplier()
{
return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
}
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.net.URI;
{
private final PopularStreamProcessor processor;
- @GetMapping("{username}")
- ResponseEntity<Ranking> queryFor(@PathVariable String username)
+ @GetMapping
+ ResponseEntity<Ranking> getRanking()
{
Optional<URI> redirect = processor.getRedirect(username);
if (redirect.isPresent())
.emitStrategy(EmitStrategy.onWindowClose())
.aggregate(
() -> new Ranking(),
- (word, counter, ranking) -> ranking,
- Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
+ (word, counter, ranking) -> ranking) //,
+ // Materialized.<String, Ranking, WindowStore<Bytes, byte[]>>as(windowBytesStoreSupplier))
.toStream()
.map((windowedWord, ranking) ->
{
.getUsersMessages()
.forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
TestData
- .getTop10Messages()
+ .getMessages()
.forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
}
import de.juplo.kafka.wordcount.counter.TestRanking;
import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.counter.TestWordCounter;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.time.Duration;
import java.util.Map;
import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig;
TopologyTestDriver testDriver;
- TestInputTopic<TestWord, TestRanking> top10In;
- TestInputTopic<String, TestUserData> userIn;
+ TestInputTopic<TestWord, TestWordCounter> in;
@BeforeEach
{
Topology topology = PopularStreamProcessor.buildTopology(
USERS_IN,
- TOP10_IN,
- Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+ Stores.inMemoryWindowStore(
+ USERS_STORE_NAME,
+ Duration.ofSeconds(1),
+ Duration.ofSeconds(1),
+ false),
Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
- top10In = testDriver.createInputTopic(
+ in = testDriver.createInputTopic(
TOP10_IN,
jsonSerializer(TestWord.class, true),
- jsonSerializer(TestRanking.class,false));
-
- userIn = testDriver.createInputTopic(
- USERS_IN,
- new StringSerializer(),
- jsonSerializer(TestUserData.class, false).noTypeInfo());
+ jsonSerializer(TestWordCounter.class,false));
}
.getUsersMessages()
.forEach(kv -> userIn.pipeInput(kv.key, kv.value));
TestData
- .getTop10Messages()
- .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+ .getMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
TestData.assertExpectedState(user -> store.get(user));
static final TestWord PETER = TestWord.of("peter");
static final TestWord KLAUS = TestWord.of("klaus");
- static final Stream<KeyValue<TestWord, TestWordCounter>> getTop10Messages()
+ static final Stream<KeyValue<TestWord, TestWordCounter>> getMessages()
{
return Stream.of(TOP10_MESSAGES);
}
- static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
- {
- return Stream.of(USERS_MESSAGES);
- }
-
- static void assertExpectedState(Function<String, UserRanking> function)
+ static void assertExpectedState(Function<String, Ranking> function)
{
assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser()));
assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser()));
}
- private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
+ private static void assertRankingEqualsRankingFromLastMessage(String user, Ranking rankingJson)
{
assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
}
- private static UserRanking getLastMessageFor(String user)
+ private static Ranking getLastMessageFor(String user)
{
- return getTop10Messages()
+ return getMessages()
.filter(kv -> kv.key.getUser().equals(user))
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
}
- private static UserRanking userRankingFor(String user, TestRanking testRanking)
+ private static Ranking userRankingFor(String user, Ranking testRanking)
{
TestUserData testUserData = getUsersMessages()
.filter(kv -> kv.key.equals(user))