</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.3.0</version>
+ <version>1.3.1</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
@SpringBootTest(
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=0",
+ "juplo.wordcount.counter.commit-interval=100",
+ "juplo.wordcount.counter.cache-max-bytes=0",
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
return new Consumer();
}
- @Primary
@Bean
- KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ KeyValueBytesStoreSupplier storeSupplier()
{
return Stores.inMemoryKeyValueStore(STORE_NAME);
}
import org.springframework.util.MultiValueMap;
import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
@Slf4j
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
- public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
out = testDriver.createOutputTopic(
OUT,
- new JsonDeserializer()
- .copyWithType(TestOutputWord.class)
- .ignoreTypeHeaders(),
- new JsonDeserializer()
- .copyWithType(TestOutputWordCounter.class)
- .ignoreTypeHeaders());
+ new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(),
+ new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders());
}
private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_HALLO)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
- new KeyValue<>(
+ KeyValue.pair(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
};