* The assertion, regarding the expected state, is true for both tests.
* The assertion, regarding the expected messages, ix only true for the
test that is based on the `TopologyTestDriver`.
* _This highlights the need for an additional integration test to avoid
misinterpretation of the results of tests, that are based on the
``TopologyTestDriver``.
* _Note,_ that the assertion, regarding the expected state, works as
well in the `Top10ApplicationIT`, if the output buffering is enabled,
because the changed state becomes visible immediately.
* The buffering ist still turned of, so that the disabled test can be
run in IntelliJ to comprehend, that the assertion is actually false.
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
public Top10StreamProcessor streamProcessor(
Top10ApplicationProperties applicationProperties,
Properties streamProcessorProperties,
+ KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
- streamProcessorProperties);
+ streamProcessorProperties,
+ storeSupplier);
streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
{
return streamProcessor;
}
+
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore("top10");
+ }
}
package de.juplo.kafka.wordcount.top10;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
public Top10StreamProcessor(
String inputTopic,
String outputTopic,
- Properties props)
+ Properties props,
+ KeyValueBytesStoreSupplier storeSupplier)
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
outputTopic,
- null);
+ storeSupplier);
streams = new KafkaStreams(topology, props);
}
.groupByKey()
.aggregate(
() -> new Ranking(),
- (user, entry, ranking) -> ranking.add(entry))
+ (user, entry, ranking) -> ranking.add(entry),
+ Materialized.as(storeSupplier))
.toStream()
.to(outputTopic);
ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
{
- return null;
+ return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
}
public void start()
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
@DisplayName("Await the expected output messages")
@Test
+ @Disabled
public void testAwaitExpectedMessages()
{
await("Expected messages")