projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
47ed803
)
splitter: 1.2.0 - A domain-class (``User``) is used as key
author
Kai Moritz
<kai@juplo.de>
Sun, 2 Jun 2024 16:41:02 +0000
(18:41 +0200)
committer
Kai Moritz
<kai@juplo.de>
Tue, 4 Jun 2024 21:25:54 +0000
(23:25 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/splitter/User.java
[new file with mode: 0644]
patch
|
blob
src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java
[new file with mode: 0644]
patch
|
blob
src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java
[new file with mode: 0644]
patch
|
blob
src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
index
ead41f8
..
7143e1a
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
@@
-33,9
+33,10
@@
public class SplitterApplicationConfiguration
propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String
Serde.class.getName());
+ propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Json
Serde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName());
+ propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName());
propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName());
propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git
a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
index
60c569b
..
d0070c0
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
@@
-25,7
+25,7
@@
public class SplitterStreamProcessor
{
StreamsBuilder builder = new StreamsBuilder();
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<
String
, Recording> source = builder.stream(inputTopic);
+ KStream<
User
, Recording> source = builder.stream(inputTopic);
source
.flatMapValues(recording -> Arrays
source
.flatMapValues(recording -> Arrays
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/User.java
b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java
new file mode 100644
(file)
index 0000000..
8a65695
--- /dev/null
+++ b/
src/main/java/de/juplo/kafka/wordcount/splitter/User.java
@@ -0,0
+1,12
@@
+package de.juplo.kafka.wordcount.splitter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+ String user;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java
b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java
new file mode 100644
(file)
index 0000000..
4406b3b
--- /dev/null
+++ b/
src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java
@@ -0,0
+1,14
@@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputUser
+{
+ String user;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java
b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java
new file mode 100644
(file)
index 0000000..
ce413ba
--- /dev/null
+++ b/
src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java
@@ -0,0
+1,14
@@
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+ String user;
+}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
index
891a435
..
e945b31
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
@@
-1,7
+1,9
@@
package de.juplo.kafka.wordcount.splitter;
package de.juplo.kafka.wordcount.splitter;
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
import de.juplo.kafka.wordcount.counter.TestOutputWord;
import de.juplo.kafka.wordcount.recorder.TestInputRecording;
import de.juplo.kafka.wordcount.counter.TestOutputWord;
import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@
-28,12
+30,14
@@
import static org.awaitility.Awaitility.await;
@SpringBootTest(
properties = {
@SpringBootTest(
properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.
value.default.type=de.juplo.kafka.wordcount.counter.TestWord
",
- "spring.kafka.consumer.properties.spring.json.
trusted.packages=de.juplo.kafka.wordcount.splitter
",
+ "spring.kafka.consumer.properties.spring.json.
key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser
",
+ "spring.kafka.consumer.properties.spring.json.
value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord
",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
@@
-51,7
+55,7
@@
public class SplitterApplicationIT
@BeforeAll
public static void testSendMessage(
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<
String
, TestInputRecording> kafkaTemplate)
+ @Autowired KafkaTemplate<
TestInputUser
, TestInputRecording> kafkaTemplate)
{
TestData
.getInputMessages()
{
TestData
.getInputMessages()
@@
-59,7
+63,7
@@
public class SplitterApplicationIT
{
try
{
{
try
{
- SendResult<
String
, TestInputRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<
TestInputUser
, TestInputRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
@@
-78,7
+82,7
@@
public class SplitterApplicationIT
@Test
void testSendMessage() throws Exception
{
@Test
void testSendMessage() throws Exception
{
- await("Expe
x
ted converted data")
+ await("Expe
c
ted converted data")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
TestData.assertExpectedMessages(consumer.getReceivedMessages()));
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
TestData.assertExpectedMessages(consumer.getReceivedMessages()));
@@
-87,18
+91,18
@@
public class SplitterApplicationIT
static class Consumer
{
static class Consumer
{
- private final MultiValueMap<
String
, TestOutputWord> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<
TestOutputUser
, TestOutputWord> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY)
String
key,
+ @Header(KafkaHeaders.RECEIVED_KEY)
TestOutputUser
key,
@Payload TestOutputWord value)
{
log.debug("Received message: {}={}", key, value);
received.add(key, value);
}
@Payload TestOutputWord value)
{
log.debug("Received message: {}={}", key, value);
received.add(key, value);
}
- synchronized MultiValueMap<
String
, TestOutputWord> getReceivedMessages()
+ synchronized MultiValueMap<
TestOutputUser
, TestOutputWord> getReceivedMessages()
{
return received;
}
{
return received;
}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java
index
feedb1e
..
f89b099
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java
@@
-1,7
+1,9
@@
package de.juplo.kafka.wordcount.splitter;
package de.juplo.kafka.wordcount.splitter;
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
import de.juplo.kafka.wordcount.counter.TestOutputWord;
import de.juplo.kafka.wordcount.recorder.TestInputRecording;
import de.juplo.kafka.wordcount.counter.TestOutputWord;
import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
import org.apache.kafka.streams.KeyValue;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.apache.kafka.streams.KeyValue;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@
-15,29
+17,29
@@
import static org.awaitility.Awaitility.await;
public class TestData
{
public class TestData
{
- static final
String PETER = "peter"
;
- static final
String KLAUS = "klaus"
;
+ static final
TestInputUser PETER = TestInputUser.of("peter")
;
+ static final
TestInputUser KLAUS = TestInputUser.of("klaus")
;
- static final Stream<KeyValue<
String
, TestInputRecording>> getInputMessages()
+ static final Stream<KeyValue<
TestInputUser
, TestInputRecording>> getInputMessages()
{
return Stream.of(INPUT_MESSAGES);
}
{
return Stream.of(INPUT_MESSAGES);
}
- private static final KeyValue<
String
, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<
TestInputUser
, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
PETER,
{
new KeyValue<>(
PETER,
- TestInputRecording.of(PETER, "Hallo Welt!")),
+ TestInputRecording.of(PETER
.getUser()
, "Hallo Welt!")),
new KeyValue<>(
KLAUS,
new KeyValue<>(
KLAUS,
- TestInputRecording.of(KLAUS, "Müsch gäb's auch!")),
+ TestInputRecording.of(KLAUS
.getUser()
, "Müsch gäb's auch!")),
new KeyValue<>(
PETER,
new KeyValue<>(
PETER,
- TestInputRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")),
+ TestInputRecording.of(PETER
.getUser()
, "Boäh, echt! ß mal nä Nümmäh!")),
};
};
- static void assertExpectedMessages(MultiValueMap<
String
, TestOutputWord> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<
TestOutputUser
, TestOutputWord> receivedMessages)
{
await("Received expected messages")
.atMost(Duration.ofSeconds(5))
{
await("Received expected messages")
.atMost(Duration.ofSeconds(5))
@@
-45,49
+47,49
@@
public class TestData
assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
}
assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
}
- private static final KeyValue<
String
, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static final KeyValue<
TestOutputUser
, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
{
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "Hallo")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "Hallo")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "Welt")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "Welt")),
KeyValue.pair(
KeyValue.pair(
-
"klaus"
,
- TestOutputWord.of(
"klaus"
, "Müsch")),
+
TestOutputUser.of(KLAUS.getUser())
,
+ TestOutputWord.of(
KLAUS.getUser()
, "Müsch")),
KeyValue.pair(
KeyValue.pair(
-
"klaus"
,
- TestOutputWord.of(
"klaus"
, "gäb")),
+
TestOutputUser.of(KLAUS.getUser())
,
+ TestOutputWord.of(
KLAUS.getUser()
, "gäb")),
KeyValue.pair(
KeyValue.pair(
-
"klaus"
,
- TestOutputWord.of(
"klaus"
, "s")),
+
TestOutputUser.of(KLAUS.getUser())
,
+ TestOutputWord.of(
KLAUS.getUser()
, "s")),
KeyValue.pair(
KeyValue.pair(
-
"klaus"
,
- TestOutputWord.of(
"klaus"
, "auch")),
+
TestOutputUser.of(KLAUS.getUser())
,
+ TestOutputWord.of(
KLAUS.getUser()
, "auch")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "Boäh")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "Boäh")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "echt")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "echt")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "ß")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "ß")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "mal")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "mal")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "nä")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "nä")),
KeyValue.pair(
KeyValue.pair(
-
"peter"
,
- TestOutputWord.of(
"peter"
, "Nümmäh")),
+
TestOutputUser.of(PETER.getUser())
,
+ TestOutputWord.of(
PETER.getUser()
, "Nümmäh")),
};
};
- static MultiValueMap<
String
, TestOutputWord> expectedMessages()
+ static MultiValueMap<
TestOutputUser
, TestOutputWord> expectedMessages()
{
{
- MultiValueMap<
String
, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<
TestOutputUser
, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));