-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class CounterApplication
+public class PopularApplication
{
public static void main(String[] args)
{
- SpringApplication.run(CounterApplication.class, args);
+ SpringApplication.run(PopularApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
@Slf4j
-public class CounterApplicationConfiguriation
+public class PopularApplicationConfiguriation
{
@Bean
public Properties streamProcessorProperties(
- CounterApplicationProperties counterProperties)
+ PopularApplicationProperties counterProperties)
{
Properties propertyMap = serializationConfig();
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties applicationProperties,
+ public PopularStreamProcessor streamProcessor(
+ PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+ PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("juplo.wordcount.counter")
+@ConfigurationProperties("juplo.wordcount.popular")
@Getter
@Setter
@ToString
-public class CounterApplicationProperties
+public class PopularApplicationProperties
{
private String bootstrapServer = "localhost:9092";
- private String applicationId = "counter";
+ private String applicationId = "popular";
private String inputTopic = "words";
- private String outputTopic = "countings";
+ private String outputTopic = "popular";
private Integer commitInterval;
private Integer cacheMaxBytes;
}
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
@Slf4j
-public class CounterStreamProcessor
+public class PopularStreamProcessor
{
- public static final String STORE_NAME = "counter";
+ public static final String STORE_NAME = "popular";
public final KafkaStreams streams;
- public CounterStreamProcessor(
+ public PopularStreamProcessor(
String inputTopic,
String outputTopic,
Properties properties,
KeyValueBytesStoreSupplier storeSupplier)
{
- Topology topology = CounterStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
storeSupplier);
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.topwords.TestOutputWord;
+import de.juplo.kafka.wordcount.topwords.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
"spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
- "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=0",
- "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
- "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+ "juplo.wordcount.popular.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.popular.commit-interval=0",
+ "juplo.wordcount.popular.input-topic=" + TOPIC_IN,
+ "juplo.wordcount.popular.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
@Slf4j
-public class CounterApplicationIT
+public class PopularApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
@Autowired
Consumer consumer;
@Autowired
- CounterStreamProcessor streamProcessor;
+ PopularStreamProcessor streamProcessor;
@BeforeAll
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.topwords.TestOutputWord;
+import de.juplo.kafka.wordcount.topwords.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
@Slf4j
-public class CounterStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
@BeforeEach
public void setUpTestDriver()
{
- Topology topology = CounterStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.topwords.TestOutputWord;
+import de.juplo.kafka.wordcount.topwords.TestOutputWordCounter;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.topwords;
import lombok.AllArgsConstructor;
import lombok.Data;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.topwords;
import lombok.AllArgsConstructor;
import lombok.Data;
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
- <logger name="de.juplo.kafka.wordcount.counter" level="DEBUG" />
+ <logger name="de.juplo.kafka.wordcount.popular" level="DEBUG" />
</configuration>