package de.juplo.kafka.wordcount.counter;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Word
}
});
- Message peter1 = Message.of(
- Key.of("peter", "Hallo"),
- WordCount.of("peter", "Hallo", 1l));
- Message peter2 = Message.of(
- Key.of("peter", "Welt"),
- WordCount.of("peter", "Welt", 1l));
- Message peter3 = Message.of(
- Key.of("peter", "Boäh"),
- WordCount.of("peter", "Boäh", 1l));
- Message peter4 = Message.of(
- Key.of("peter", "Boäh"),
- WordCount.of("peter", "Boäh", 2l));
- Message peter5 = Message.of(
- Key.of("peter", "Boäh"),
- WordCount.of("peter", "Boäh", 3l));
- Message peter6 = Message.of(
- Key.of("peter", "Welt"),
- WordCount.of("peter", "Welt", 2l));
-
- Message klaus1 = Message.of(
- Key.of("klaus", "Müsch"),
- WordCount.of("klaus", "Müsch", 1l));
- Message klaus2 = Message.of(
- Key.of("klaus", "Müsch"),
- WordCount.of("klaus", "Müsch", 2l));
- Message klaus3 = Message.of(
- Key.of("klaus", "s"),
- WordCount.of("klaus", "s", 1l));
- Message klaus4 = Message.of(
- Key.of("klaus", "s"),
- WordCount.of("klaus", "s", 2l));
- Message klaus5 = Message.of(
- Key.of("klaus", "s"),
- WordCount.of("klaus", "s", 3l));
-
await("Expexted converted data")
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
public synchronized void receive(ConsumerRecord<String, String> record) throws JsonProcessingException
{
log.debug("Received message: {}", record);
- Key key = mapper.readValue(record.key(), Key.class);
+ Word key = mapper.readValue(record.key(), Word.class);
WordCount value = mapper.readValue(record.value(), WordCount.class);
- received.add(key.getUser(), Message.of(key,value));
+ received.add(Message.of(key,value));
}
synchronized List<Message> getReceivedMessages()
new StringSerializer(),
new StringSerializer());
- TestOutputTopic<Key, String> out = testDriver.createOutputTopic(
+ TestOutputTopic<Word, WordCount> out = testDriver.createOutputTopic(
OUT,
- new JsonDeserializer<Key>(Key.class).ignoreTypeHeaders(),
- new StringDeserializer());
+ new JsonDeserializer<>(Word.class).ignoreTypeHeaders(),
+ new JsonDeserializer<>(WordCount.class).ignoreTypeHeaders());
TestData.writeInputData((key, value) -> in.pipeInput(key, value));
@Value(staticConstructor = "of")
public class Message
{
- Key key;
- String value;
+ Word key;
+ WordCount value;
}
static Message[] expectedMessages =
{
Message.of(
- Key.of("peter", "Hallo"),
- "1"),
+ Word.of("peter", "Hallo"),
+ WordCount.of("peter", "Hallo", 1l)),
Message.of(
- Key.of("klaus", "Müsch"),
- "1"),
+ Word.of("klaus", "Müsch"),
+ WordCount.of("klaus", "Müsch", 1l)),
Message.of(
- Key.of("peter", "Welt"),
- "1"),
+ Word.of("peter", "Welt"),
+ WordCount.of("peter", "Welt", 1l)),
Message.of(
- Key.of("klaus", "Müsch"),
- "2"),
+ Word.of("klaus", "Müsch"),
+ WordCount.of("klaus", "Müsch", 2l)),
Message.of(
- Key.of("klaus", "s"),
- "1"),
+ Word.of("klaus", "s"),
+ WordCount.of("klaus", "s", 1l)),
Message.of(
- Key.of("peter", "Boäh"),
- "1"),
+ Word.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 1l)),
Message.of(
- Key.of("peter", "Welt"),
- "2"),
+ Word.of("peter", "Welt"),
+ WordCount.of("peter", "Welt", 2l)),
Message.of(
- Key.of("peter", "Boäh"),
- "2"),
+ Word.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 2l)),
Message.of(
- Key.of("klaus", "s"),
- "2"),
+ Word.of("klaus", "s"),
+ WordCount.of("klaus", "s", 2l)),
Message.of(
- Key.of("peter", "Boäh"),
- "3"),
+ Word.of("peter", "Boäh"),
+ WordCount.of("peter", "Boäh", 3l)),
Message.of(
- Key.of("klaus", "s"),
- "3"),
+ Word.of("klaus", "s"),
+ WordCount.of("klaus", "s", 3l))
};
}