import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Bean
public KeyValueBytesStoreSupplier storeSupplier()
{
- return Stores.persistentKeyValueStore("counter");
+ return Stores.persistentKeyValueStore(STORE_NAME);
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
@Slf4j
public class CounterStreamProcessor
{
+ public static final String STORE_NAME = "counter";
+
+
public final KafkaStreams streams;
return topology;
}
+ ReadOnlyKeyValueStore<Word, Long> getStore()
+ {
+ return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+ }
+
public void start()
{
log.info("Starting Stream-Processor");
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
@Autowired
Consumer consumer;
+ @Autowired
+ CounterStreamProcessor streamProcessor;
@BeforeAll
.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
}
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ }
+
static class Consumer
{
@Bean
KeyValueBytesStoreSupplier inMemoryStoreSupplier()
{
- return Stores.inMemoryKeyValueStore("TEST-STORE");
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
}
}
}
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@Slf4j
public class CounterStreamProcessorTopologyTest
{
- public final static String IN = "TEST-IN";
- public final static String OUT = "TEST-OUT";
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
Topology topology = CounterStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+ Stores.inMemoryKeyValueStore(STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+ KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
}
@AfterEach
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
}
+ static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+ {
+ assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+ }
+
+ private static Word wordOf(TestOutputWord testOutputWord)
+ {
+ Word word = new Word();
+
+ word.setUser(testOutputWord.getUser());
+ word.setWord(testOutputWord.getWord());
+
+ return word;
+ }
+
static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
{
assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
}
+ private static void assertWordCountEqualsWordCountFromLastMessage(
+ TestOutputWord word,
+ Long counter)
+ {
+ TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+ word.getUser(),
+ word.getWord(),
+ counter);
+ assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+ }
+
private static void assertWordCountEqualsWordCountFromLastMessage(
TestOutputWord word,
TestOutputWordCounter counter)