projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
df0c222
)
top10: 1.2.1 - (RED) Added an assertion regarding the expected state
author
Kai Moritz
<kai@juplo.de>
Thu, 30 May 2024 09:50:56 +0000
(11:50 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 8 Jun 2024 11:15:02 +0000
(13:15 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
index
d3846d8
..
2ff078c
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
@@
-5,6
+5,8
@@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.Properties;
import java.util.Properties;
@@
-22,14
+24,16
@@
public class Top10StreamProcessor
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
{
Topology topology = Top10StreamProcessor.buildTopology(
inputTopic,
- outputTopic);
+ outputTopic,
+ null);
streams = new KafkaStreams(topology, props);
}
static Topology buildTopology(
String inputTopic,
streams = new KafkaStreams(topology, props);
}
static Topology buildTopology(
String inputTopic,
- String outputTopic)
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
{
StreamsBuilder builder = new StreamsBuilder();
@@
-49,6
+53,11
@@
public class Top10StreamProcessor
return topology;
}
return topology;
}
+ ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ {
+ return null;
+ }
+
public void start()
{
log.info("Starting Stream-Processor");
public void start()
{
log.info("Starting Stream-Processor");
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
index
3bb6b54
..
f6d7ccd
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
@@
-5,6
+5,7
@@
import de.juplo.kafka.wordcount.counter.TestWord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@
-63,6
+64,12
@@
class TestData
.containsExactlyElementsOf(rankings));
}
.containsExactlyElementsOf(rankings));
}
+ static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ {
+ assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+ assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+ }
+
static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
index
b35dd3d
..
726b1e7
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
@@
-4,6
+4,8
@@
import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.query.TestRanking;
import lombok.extern.slf4j.Slf4j;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.query.TestRanking;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@
-11,6
+13,7
@@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
@@
-54,9
+57,12
@@
public class Top10ApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
+ public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
@Autowired
Consumer consumer;
+ @Autowired
+ Top10StreamProcessor streamProcessor;
@BeforeAll
@BeforeAll
@@
-84,6
+90,15
@@
public class Top10ApplicationIT
});
}
});
}
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ }
+
@DisplayName("Await the expected output messages")
@Test
public void testAwaitExpectedMessages()
@DisplayName("Await the expected output messages")
@Test
public void testAwaitExpectedMessages()
@@
-121,5
+136,12
@@
public class Top10ApplicationIT
{
return new Consumer();
}
{
return new Consumer();
}
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
}
}
}
}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
index
01c1cf6
..
1becd65
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
@@
-5,6
+5,8
@@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@
-27,8
+29,9
@@
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.K
@Slf4j
public class Top10StreamProcessorTopologyTest
{
@Slf4j
public class Top10StreamProcessorTopologyTest
{
- public final static String IN = "TEST-IN";
- public final static String OUT = "TEST-OUT";
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
@@
-39,7
+42,10
@@
public class Top10StreamProcessorTopologyTest
@BeforeEach
public void setUp()
{
@BeforeEach
public void setUp()
{
- Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+ Topology topology = Top10StreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore(STORE_NAME));
Top10ApplicationConfiguration applicationConfiguriation =
new Top10ApplicationConfiguration();
Top10ApplicationConfiguration applicationConfiguriation =
new Top10ApplicationConfiguration();
@@
-91,6
+97,9
@@
public class Top10StreamProcessorTopologyTest
});
TestData.assertExpectedMessages(receivedMessages);
});
TestData.assertExpectedMessages(receivedMessages);
+
+ KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
}
@AfterEach
}
@AfterEach