summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
8d04265)
* _GREEN:_ The `CounterApplicationIT` does _not_ reveal the bug!
* _RED:_ The `CounterStreamProcessorToplogyTest` fails with an exception,
that gives a hint for the cause of the bug.
* The bug is caused by missing type-specifications for the operation
``cout()``.
* Before the introduction of the domain-class `User` everything worked as
expected, because the class `Word` could be specified as default for
the deserialization of the key.
** With the introduction of the domain-class `User` as key of the incoming
messages, the default for the key has to switched to this class, to
enable the application to deserialize incomming keys despite the missing
type mapping.
** Beforehand, the default `Word` covered the missing type information
for the ``count()``-operator.
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.15</version>
+ <version>1.3.0</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
+ propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(
JsonDeserializer.TYPE_MAPPINGS,
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(
JsonDeserializer.TYPE_MAPPINGS,
+ "user:" + User.class.getName() + "," +
"word:" + Word.class.getName() + "," +
"counter:" + WordCounter.class.getName());
"word:" + Word.class.getName() + "," +
"counter:" + WordCounter.class.getName());
package de.juplo.kafka.wordcount.counter;
import lombok.extern.slf4j.Slf4j;
package de.juplo.kafka.wordcount.counter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
{
StreamsBuilder builder = new StreamsBuilder();
{
StreamsBuilder builder = new StreamsBuilder();
- KStream<String, Word> source = builder.stream(
- inputTopic,
- Consumed.with(Serdes.String(), null));
+ KStream<User, Word> source = builder.stream(inputTopic);
source
.map((key, word) -> new KeyValue<>(word, word))
source
.map((key, word) -> new KeyValue<>(word, word))
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+ String user;
+}
package de.juplo.kafka.wordcount.counter;
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
@SpringBootTest(
properties = {
@SpringBootTest(
properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
@BeforeAll
public static void testSendMessage(
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
+ @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
{
TestData
.getInputMessages()
{
TestData
.getInputMessages()
- SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
package de.juplo.kafka.wordcount.counter;
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
- TestInputTopic<String, TestInputWord> in;
+ TestInputTopic<TestInputUser, TestInputWord> in;
TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
in = testDriver.createInputTopic(
IN,
in = testDriver.createInputTopic(
IN,
- new StringSerializer(),
+ new JsonSerializer().noTypeInfo(),
new JsonSerializer().noTypeInfo());
out = testDriver.createOutputTopic(
new JsonSerializer().noTypeInfo());
out = testDriver.createOutputTopic(
package de.juplo.kafka.wordcount.counter;
package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
- private static final KeyValue<String, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_HALLO)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_HALLO)),
new KeyValue<>(
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
TestInputWord.of(KLAUS, WORD_MÜSCH)),
new KeyValue<>(
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_WELT)),
new KeyValue<>(
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
TestInputWord.of(KLAUS, WORD_S)),
new KeyValue<>(
+ TestInputUser.of(PETER),
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
TestInputWord.of(PETER, WORD_BOÄH)),
new KeyValue<>(
+ TestInputUser.of(KLAUS),
TestInputWord.of(KLAUS, WORD_S)),
};
TestInputWord.of(KLAUS, WORD_S)),
};
- static Stream<KeyValue<String, TestInputWord>> getInputMessages()
+ static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
{
return Stream.of(TestData.INPUT_MESSAGES);
}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+ String user;
+}