import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest(
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
"logging.level.org.apache.kafka.clients=INFO",
"logging.level.org.apache.kafka.streams=INFO",
"juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.query.commit-interval=100",
+ "juplo.wordcount.query.cache-max-bytes=0",
"juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
"juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
@AutoConfigureMockMvc
@TestConfiguration
static class Configuration
{
- @Primary
@Bean
- KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ KeyValueBytesStoreSupplier userStoreSupplier()
{
- return Stores.inMemoryKeyValueStore(STORE_NAME);
+ return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
}
}
}