projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
5aa0393
)
counter: 1.3.1 - Cleand code/setup for tests
author
Kai Moritz
<kai@juplo.de>
Sun, 16 Jun 2024 16:26:47 +0000
(18:26 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sun, 16 Jun 2024 18:57:17 +0000
(20:57 +0200)
pom.xml
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
patch
|
blob
|
history
diff --git
a/pom.xml
b/pom.xml
index
5859736
..
03a7b40
100644
(file)
--- a/
pom.xml
+++ b/
pom.xml
@@
-10,7
+10,7
@@
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.3.
0
</version>
+ <version>1.3.
1
</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index
334cd05
..
0faa2de
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@
-14,7
+14,6
@@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
@@
-35,6
+34,7
@@
import static org.awaitility.Awaitility.await;
@SpringBootTest(
properties = {
@SpringBootTest(
properties = {
+ "spring.main.allow-bean-definition-overriding=true",
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
@@
-45,7
+45,8
@@
import static org.awaitility.Awaitility.await;
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=0",
+ "juplo.wordcount.counter.commit-interval=100",
+ "juplo.wordcount.counter.cache-max-bytes=0",
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
@@
-155,9
+156,8
@@
public class CounterApplicationIT
return new Consumer();
}
return new Consumer();
}
- @Primary
@Bean
@Bean
- KeyValueBytesStoreSupplier
inMemoryS
toreSupplier()
+ KeyValueBytesStoreSupplier
s
toreSupplier()
{
return Stores.inMemoryKeyValueStore(STORE_NAME);
}
{
return Stores.inMemoryKeyValueStore(STORE_NAME);
}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
index
0ffd516
..
9c86c6c
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
@@
-20,6
+20,7
@@
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
import org.springframework.util.MultiValueMap;
import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
@Slf4j
@Slf4j
@@
-27,7
+28,6
@@
public class CounterStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
- public static final String STORE_NAME = "TOPOLOGY-TEST";
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
@@
-52,12
+52,8
@@
public class CounterStreamProcessorTopologyTest
out = testDriver.createOutputTopic(
OUT,
out = testDriver.createOutputTopic(
OUT,
- new JsonDeserializer()
- .copyWithType(TestOutputWord.class)
- .ignoreTypeHeaders(),
- new JsonDeserializer()
- .copyWithType(TestOutputWordCounter.class)
- .ignoreTypeHeaders());
+ new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(),
+ new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders());
}
}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
index
1ecfdbd
..
862eb2b
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
@@
-33,37
+33,37
@@
class TestData
private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
{
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_HALLO)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_HALLO)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
-
new KeyValue<>
(
+
KeyValue.pair
(
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
};
TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
};