import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
return props;
}
- static Map<String, Object> serializationConfig()
+ static Properties serializationConfig()
{
- Map<String, Object> props = new HashMap<>();
+ Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
package de.juplo.kafka.wordcount.top10;
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import java.util.Map;
-import java.util.Properties;
import java.util.stream.Stream;
import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
- Map<String, Object> propertyMap = serializationConfig();
-
- Properties properties = new Properties();
- properties.putAll(propertyMap);
-
- JsonSerde<?> keySerde = new JsonSerde<>();
- keySerde.configure(propertyMap, true);
- JsonSerde<?> valueSerde = new JsonSerde<>();
- valueSerde.configure(propertyMap, false);
-
- testDriver = new TopologyTestDriver(topology, properties);
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
in = testDriver.createInputTopic(
IN,
- (JsonSerializer<Key>)keySerde.serializer(),
- (JsonSerializer<Entry>)valueSerde.serializer());
+ jsonSerializer(Key.class, true),
+ jsonSerializer(Entry.class,false));
out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<User>)keySerde.deserializer(),
- (JsonDeserializer<Ranking>)valueSerde.deserializer());
+ new JsonDeserializer()
+ .copyWithType(User.class)
+ .ignoreTypeHeaders(),
+ new JsonDeserializer()
+ .copyWithType(Ranking.class)
+ .ignoreTypeHeaders());
}
{
testDriver.close();
}
+
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+ {
+ JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+ jsonSerializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS,
+ "word:" + TestWord.class.getName() + "," +
+ "counter:" + TestCounter.class.getName()),
+ isKey);
+ return jsonSerializer;
+ }
}