import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.query.TestRanking;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
+ public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
+ @Autowired
+ Top10StreamProcessor streamProcessor;
@BeforeAll
});
}
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ }
+
@DisplayName("Await the expected output messages")
@Test
+ @Disabled
public void testAwaitExpectedMessages()
{
await("Expected messages")
{
return new Consumer();
}
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
}
}