package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
@SpringBootTest(
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=0",
- "juplo.wordcount.counter.cacheMaxBytes=0",
+ "juplo.wordcount.counter.commit-interval=100",
+ "juplo.wordcount.counter.cache-max-bytes=0",
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
+ @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
{
TestData
.getInputMessages()
{
try
{
- SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
return new Consumer();
}
- @Primary
@Bean
- KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ KeyValueBytesStoreSupplier storeSupplier()
{
return Stores.inMemoryKeyValueStore(STORE_NAME);
}