projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
6500abf
)
top10: 1.2.1 - The name of the state-store is an internal detail
author
Kai Moritz
<kai@juplo.de>
Sat, 8 Jun 2024 11:20:52 +0000
(13:20 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 8 Jun 2024 11:20:52 +0000
(13:20 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
index
bb6fef7
..
bd5298d
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
@@
-18,6
+18,7
@@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@@
-94,6
+95,6
@@
public class Top10ApplicationConfiguration
@Bean
public KeyValueBytesStoreSupplier storeSupplier()
{
@Bean
public KeyValueBytesStoreSupplier storeSupplier()
{
- return Stores.persistentKeyValueStore(
"top10"
);
+ return Stores.persistentKeyValueStore(
STORE_NAME
);
}
}
}
}
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
index
343ab4d
..
70ead87
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
@@
-13,6
+13,8
@@
import java.util.Properties;
@Slf4j
public class Top10StreamProcessor
{
@Slf4j
public class Top10StreamProcessor
{
+ public static final String STORE_NAME= "top10";
+
public final KafkaStreams streams;
public final KafkaStreams streams;
@@
-54,9
+56,9
@@
public class Top10StreamProcessor
return topology;
}
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore(
String name
)
+ ReadOnlyKeyValueStore<User, Ranking> getStore()
{
{
- return streams.store(StoreQueryParameters.fromNameAndType(
name
, QueryableStoreTypes.keyValueStore()));
+ return streams.store(StoreQueryParameters.fromNameAndType(
STORE_NAME
, QueryableStoreTypes.keyValueStore()));
}
public void start()
}
public void start()
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
index
1097310
..
88d03ba
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
@@
-25,6
+25,7
@@
import org.springframework.util.MultiValueMap;
import java.time.Duration;
import java.util.stream.Stream;
import java.time.Duration;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.await;
@@
-55,7
+56,6
@@
public class Top10ApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
@Autowired
Consumer consumer;
@@
-94,7
+94,7
@@
public class Top10ApplicationIT
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(
STORE_NAME
)));
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@DisplayName("Await the expected output messages")
}
@DisplayName("Await the expected output messages")