import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Bean
public KeyValueBytesStoreSupplier storeSupplier()
{
- return Stores.persistentKeyValueStore("top10");
+ return Stores.persistentKeyValueStore(STORE_NAME);
}
}
@Slf4j
public class Top10StreamProcessor
{
+ public static final String STORE_NAME= "top10";
+
public final KafkaStreams streams;
return topology;
}
- ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ ReadOnlyKeyValueStore<User, Ranking> getStore()
{
- return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
+ return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
}
public void start()
import java.time.Duration;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@DisplayName("Await the expected output messages")