projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
35943a0
)
counter: 1.2.13 - The tests print out the type-mapping headers
counter-1.2.13
author
Kai Moritz
<kai@juplo.de>
Mon, 13 May 2024 20:02:57 +0000
(22:02 +0200)
committer
Kai Moritz
<kai@juplo.de>
Tue, 14 May 2024 20:38:30 +0000
(22:38 +0200)
pom.xml
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
patch
|
blob
|
history
diff --git
a/pom.xml
b/pom.xml
index
7289765
..
6405d20
100644
(file)
--- a/
pom.xml
+++ b/
pom.xml
@@
-10,7
+10,7
@@
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.1
2
</version>
+ <version>1.2.1
3
</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
diff --git
a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
index
f836f89
..
e2a55c0
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
@@
-36,6
+36,7
@@
public class CounterApplicationConfiguriation
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
+ propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
if (counterProperties.getCommitInterval() != null)
propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
if (counterProperties.getCommitInterval() != null)
propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index
5a3507a
..
78d103c
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@
-30,7
+30,10
@@
import java.util.Properties;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static org.awaitility.Awaitility.*;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
@SpringBootTest(
@SpringBootTest(
@@
-81,7
+84,12
@@
public class CounterApplicationIT
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
{
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
{
- log.debug("Received message: {}", record);
+ log.debug(
+ "Received message: {} -> {}, key: {}, value: {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
received.add(KeyValue.pair(record.key(),record.value()));
}
received.add(KeyValue.pair(record.key(),record.value()));
}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
index
b785dfa
..
ca2664e
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
@@
-1,5
+1,6
@@
package de.juplo.kafka.wordcount.counter;
package de.juplo.kafka.wordcount.counter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
@@
-12,8
+13,12
@@
import java.util.Map;
import java.util.Properties;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
import java.util.Properties;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+@Slf4j
public class CounterStreamProcessorTopologyTest
{
public final static String IN = "TEST-IN";
public class CounterStreamProcessorTopologyTest
{
public final static String IN = "TEST-IN";
@@
-55,7
+60,16
@@
public class CounterStreamProcessorTopologyTest
List<KeyValue<Word, WordCounter>> receivedMessages = out
.readRecordsToList()
.stream()
List<KeyValue<Word, WordCounter>> receivedMessages = out
.readRecordsToList()
.stream()
- .map(record -> KeyValue.pair(record.key(), record.value()))
+ .map(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ return KeyValue.pair(record.key(), record.value());
+ })
.toList();
TestData.assertExpectedResult(receivedMessages);
.toList();
TestData.assertExpectedResult(receivedMessages);
diff --git
a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
index
43e1919
..
19443ac
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
@@
-1,5
+1,7
@@
package de.juplo.kafka.wordcount.counter;
package de.juplo.kafka.wordcount.counter;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import java.util.List;
import org.apache.kafka.streams.KeyValue;
import java.util.List;
@@
-119,4
+121,17
@@
class TestData
entry -> entry.getValue()
));
}
entry -> entry.getValue()
));
}
+
+ static String parseHeader(Headers headers, String key)
+ {
+ Header header = headers.lastHeader(key);
+ if (header == null)
+ {
+ return key + "=null";
+ }
+ else
+ {
+ return key + "=" + new String(header.value());
+ }
+ }
}
}