- Stream
- .of(TestData.INPUT_MESSAGES)
- .forEach(kv -> in.pipeInput(
- Key.of(kv.key.getUser(), kv.key.getWord()),
- Entry.of(kv.value.getWord(), kv.value.getCounter())));
-
- MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
- out
- .readRecordsToList()
- .forEach(record ->
- {
- log.debug(
- "OUT: {} -> {}, {}, {}",
- record.key(),
- record.value(),
- parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
- parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
- receivedMessages.add(record.key(), record.value());
- });
-