import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
{
Properties props = new Properties();
+ props.putAll(serializationConfig());
+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ if (properties.getCommitInterval() != null)
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+ if (properties.getCacheMaxBytes() != null)
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
+
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return props;
+ }
+
+ static Map<String, Object> serializationConfig()
+ {
+ Map<String, Object> props = new HashMap<>();
+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
"counter:" + Entry.class.getName() + "," +
"user:" + User.class.getName() + "," +
"ranking:" + Ranking.class.getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- if (properties.getCommitInterval() != null)
- props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
- if (properties.getCacheMaxBytes() != null)
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
return expectedMessages;
}
- static Map<String, Object> convertToMap(Properties properties)
- {
- return properties
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> (String)entry.getKey(),
- entry -> entry.getValue()
- ));
- }
-
static String parseHeader(Headers headers, String key)
{
Header header = headers.lastHeader(key);
import java.util.Properties;
import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
- Top10ApplicationConfiguration applicationConfiguriation =
- new Top10ApplicationConfiguration();
- Properties streamProcessorProperties =
- applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
- Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+ Map<String, Object> propertyMap = serializationConfig();
+
+ Properties properties = new Properties();
+ properties.putAll(propertyMap);
JsonSerde<?> keySerde = new JsonSerde<>();
keySerde.configure(propertyMap, true);
JsonSerde<?> valueSerde = new JsonSerde<>();
valueSerde.configure(propertyMap, false);
- testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+ testDriver = new TopologyTestDriver(topology, properties);
in = testDriver.createInputTopic(
IN,