import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
- outputTopic);
+ outputTopic,
+ null);
streams = new KafkaStreams(topology, props);
}
static Topology buildTopology(
String inputTopic,
- String outputTopic)
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
return topology;
}
+ ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ {
+ return null;
+ }
+
public void start()
{
log.info("Starting Stream-Processor");
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
.containsExactlyElementsOf(rankings));
}
+ static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ {
+ assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+ assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+ }
+
static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.query.TestRanking;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
+ public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
+ @Autowired
+ Top10StreamProcessor streamProcessor;
@BeforeAll
});
}
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ }
+
@DisplayName("Await the expected output messages")
@Test
public void testAwaitExpectedMessages()
{
return new Consumer();
}
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
}
}
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@Slf4j
public class Top10StreamProcessorTopologyTest
{
- public final static String IN = "TEST-IN";
- public final static String OUT = "TEST-OUT";
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
@BeforeEach
public void setUp()
{
- Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+ Topology topology = Top10StreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore(STORE_NAME));
Top10ApplicationConfiguration applicationConfiguriation =
new Top10ApplicationConfiguration();
});
TestData.assertExpectedMessages(receivedMessages);
+
+ KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
}
@AfterEach