</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>counter</artifactId>
- <version>1.2.12</version>
+ <version>1.2.13</version>
<name>Wordcount-Counter</name>
<description>Word-counting stream-processor of the multi-user wordcount-example</description>
<properties>
propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName());
propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
+ propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
if (counterProperties.getCommitInterval() != null)
propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
-import static org.awaitility.Awaitility.*;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
@SpringBootTest(
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(ConsumerRecord<Word, WordCounter> record)
{
- log.debug("Received message: {}", record);
+ log.debug(
+ "Received message: {} -> {}, key: {}, value: {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
received.add(KeyValue.pair(record.key(),record.value()));
}
package de.juplo.kafka.wordcount.counter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+@Slf4j
public class CounterStreamProcessorTopologyTest
{
public final static String IN = "TEST-IN";
List<KeyValue<Word, WordCounter>> receivedMessages = out
.readRecordsToList()
.stream()
- .map(record -> KeyValue.pair(record.key(), record.value()))
+ .map(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ return KeyValue.pair(record.key(), record.value());
+ })
.toList();
TestData.assertExpectedResult(receivedMessages);
package de.juplo.kafka.wordcount.counter;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
import java.util.List;
entry -> entry.getValue()
));
}
+
+ static String parseHeader(Headers headers, String key)
+ {
+ Header header = headers.lastHeader(key);
+ if (header == null)
+ {
+ return key + "=null";
+ }
+ else
+ {
+ return key + "=" + new String(header.value());
+ }
+ }
}