From: Kai Moritz Date: Tue, 14 May 2024 21:52:28 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=32bd4b205c027ce14fd694b036a811d2bfed7042;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index ae161d8..fdb2e6b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -34,7 +34,7 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Key.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Counter.class.getName()); props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 3a744dd..7a03ba0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; @@ -12,8 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; -import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @@ -29,10 +28,10 @@ public class Top10StreamProcessorTopologyTest { Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); - Top10ApplicationConfiguriation applicationConfiguriation = - new Top10ApplicationConfiguriation(); + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); Properties streamProcessorProperties = - applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties()); + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); Map propertyMap = convertToMap(streamProcessorProperties); JsonSerde keySerde = new JsonSerde<>(); @@ -42,19 +41,19 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); - TestInputTopic in = testDriver.createInputTopic( + TestInputTopic in = testDriver.createInputTopic( IN, - (JsonSerializer)keySerde.serializer(), - (JsonSerializer)valueSerde.serializer()); + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); TestData.writeInputData((key, value) -> in.pipeInput(key, value)); - List> receivedMessages = out + List> receivedMessages = out .readRecordsToList() .stream() .map(record ->