-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class CounterApplication
+public class PopularApplication
{
public static void main(String[] args)
{
- SpringApplication.run(CounterApplication.class, args);
+ SpringApplication.run(PopularApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
@Slf4j
-public class CounterApplicationConfiguriation
+public class PopularApplicationConfiguriation
{
@Bean
public Properties streamProcessorProperties(
- CounterApplicationProperties counterProperties)
+ PopularApplicationProperties counterProperties)
{
Properties propertyMap = serializationConfig();
propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, PopularApplication.class.getPackageName());
return propertyMap;
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties applicationProperties,
+ public PopularStreamProcessor streamProcessor(
+ PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+ PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("juplo.wordcount.counter")
+@ConfigurationProperties("juplo.wordcount.popular")
@Getter
@Setter
@ToString
-public class CounterApplicationProperties
+public class PopularApplicationProperties
{
private String bootstrapServer = "localhost:9092";
- private String applicationId = "counter";
+ private String applicationId = "popular";
private String inputTopic = "words";
- private String outputTopic = "countings";
+ private String outputTopic = "popular";
private Integer commitInterval;
private Integer cacheMaxBytes;
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
@Slf4j
-public class CounterStreamProcessor
+public class PopularStreamProcessor
{
- public static final String STORE_NAME = "counter";
+ public static final String STORE_NAME = "popular";
public final KafkaStreams streams;
- public CounterStreamProcessor(
+ public PopularStreamProcessor(
String inputTopic,
String outputTopic,
Properties properties,
KeyValueBytesStoreSupplier storeSupplier)
{
- Topology topology = CounterStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
storeSupplier);
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
server:
- port: 8083
+ port: 8087
management:
endpoints:
web:
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.stats.OutputWindowedWord,counter:de.juplo.kafka.wordcount.stats.OutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
- "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=100",
- "juplo.wordcount.counter.cache-max-bytes=0",
- "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
- "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+ "juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.popular.commit-interval=100",
+ "juplo.wordcount.popular.cache-max-bytes=0",
+ "juplo.wordcount.popular.input-topic=" + TOPIC_IN,
+ "juplo.wordcount.popular.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
@Slf4j
-public class CounterApplicationIT
+public class PopularApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
@Autowired
Consumer consumer;
@Autowired
- CounterStreamProcessor streamProcessor;
+ PopularStreamProcessor streamProcessor;
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+ @Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
{
TestData
.getInputMessages()
{
try
{
- SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
static class Consumer
{
- private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<OutputWindowedWord, OutputWordCounter> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
- @Payload TestOutputWordCounter counter)
+ @Header(KafkaHeaders.RECEIVED_KEY) OutputWindowedWord word,
+ @Payload OutputWordCounter counter)
{
log.debug("Received message: {} -> {}", word, counter);
received.add(word, counter);
}
synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+ java.util.function.Consumer<MultiValueMap<OutputWindowedWord, OutputWordCounter>> assertion)
{
assertion.accept(received);
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import java.util.Map;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
@Slf4j
-public class CounterStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
static TopologyTestDriver testDriver;
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+ static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
@BeforeAll
public static void setUpTestDriver()
{
- Topology topology = CounterStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
- TestInputTopic<TestInputUser, TestInputWord> in =
+ TestInputTopic<InputUser, InputWord> in =
testDriver.createInputTopic(IN, serializer(), serializer());
- TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
+ TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
TestData
return new JsonSerializer().noTypeInfo();
}
- private static JsonDeserializer<TestOutputWord> keyDeserializer()
+ private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
{
return deserializer(true);
}
- private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+ private static JsonDeserializer<OutputWordCounter> valueDeserializer()
{
return deserializer(false);
}
private static String typeMappingsConfig()
{
- return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+ return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
}
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
static final String WORD_S = "s";
static final String WORD_BOÄH = "Boäh";
- static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
- static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
- static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
- static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
- static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+ static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
+ static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
+ static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
+ static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
+ static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
- private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_HALLO)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_HALLO)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_MÜSCH)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_WELT)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_MÜSCH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_WELT)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
+ InputUser.of(PETER),
+ InputWord.of(PETER, WORD_BOÄH)),
KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
+ InputUser.of(KLAUS),
+ InputWord.of(KLAUS, WORD_S)),
};
- static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+ static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
{
return Stream.of(TestData.INPUT_MESSAGES);
}
- static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
expectedMessages().forEach(
(word, counter) ->
.containsExactlyElementsOf(counter));
}
- static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
}
- private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+ private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
{
return messagesForUsers.get(word) == null
? 0
assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
}
- private static Word wordOf(TestOutputWord testOutputWord)
+ private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
{
Word word = new Word();
- word.setUser(testOutputWord.getUser());
- word.setWord(testOutputWord.getWord());
+ word.setUser(testOutputWindowedWord.getUser());
+ word.setWord(testOutputWindowedWord.getWord());
return word;
}
- static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
{
assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
}
private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
+ OutputWindowedWord word,
Long counter)
{
- TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+ OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
word.getUser(),
word.getWord(),
counter);
}
private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- TestOutputWordCounter counter)
+ OutputWindowedWord word,
+ OutputWordCounter counter)
{
assertThat(counter).isEqualTo(getLastMessageFor(word));
}
- private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+ private static OutputWordCounter getLastMessageFor(OutputWindowedWord word)
{
return getLastMessageFor(word, expectedMessages());
}
- private static TestOutputWordCounter getLastMessageFor(
- TestOutputWord user,
- MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+ private static OutputWordCounter getLastMessageFor(
+ OutputWindowedWord user,
+ MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForWord)
{
return messagesForWord
.get(user)
.reduce(null, (left, right) -> right);
}
- private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair(
PETER_HALLO,
- TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+ OutputWordCounter.of(PETER, WORD_HALLO,1)),
KeyValue.pair(
KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+ OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
KeyValue.pair(
PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+ OutputWordCounter.of(PETER, WORD_WELT,1)),
KeyValue.pair(
KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+ OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+ OutputWordCounter.of(KLAUS, WORD_S,1)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,1)),
KeyValue.pair(
PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+ OutputWordCounter.of(PETER, WORD_WELT,2)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,2)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+ OutputWordCounter.of(KLAUS, WORD_S,2)),
KeyValue.pair(
PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+ OutputWordCounter.of(PETER, WORD_BOÄH,3)),
KeyValue.pair(
KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+ OutputWordCounter.of(KLAUS, WORD_S,3)),
};
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+ static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
{
- MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
-public class TestInputUser
+public class InputUser
{
String user;
}
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
-public class TestInputWord
+public class InputWord
{
String user;
String word;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
+public class OutputWindowedWord
{
String user;
String word;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
+public class OutputWordCounter
{
String user;
String word;
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
- <logger name="de.juplo.kafka.wordcount.counter" level="DEBUG" />
+ <logger name="de.juplo.kafka.wordcount.popular" level="DEBUG" />
</configuration>