-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AccessLevel;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.*;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class Top10Application
+public class PopularApplication
{
public static void main(String[] args)
{
- SpringApplication.run(Top10Application.class, args);
+ SpringApplication.run(PopularApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
@Slf4j
-public class Top10ApplicationConfiguration
+public class PopularApplicationConfiguration
{
@Bean
- public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+ public Properties streamProcessorProperties(PopularApplicationProperties properties)
{
Properties props = new Properties();
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public Top10StreamProcessor streamProcessor(
- Top10ApplicationProperties applicationProperties,
+ public PopularStreamProcessor streamProcessor(
+ PopularApplicationProperties applicationProperties,
Properties streamProcessorProperties,
KeyValueBytesStoreSupplier storeSupplier,
ConfigurableApplicationContext context)
{
- Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
applicationProperties.getInputTopic(),
applicationProperties.getOutputTopic(),
streamProcessorProperties,
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import lombok.Getter;
@Getter
@Setter
@ToString
-public class Top10ApplicationProperties
+public class PopularApplicationProperties
{
private String bootstrapServer = "localhost:9092";
private String applicationId = "top10";
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.*;
@Slf4j
-public class Top10StreamProcessor
+public class PopularStreamProcessor
{
public static final String STORE_NAME= "top10";
public final KafkaStreams streams;
- public Top10StreamProcessor(
+ public PopularStreamProcessor(
String inputTopic,
String outputTopic,
Properties props,
KeyValueBytesStoreSupplier storeSupplier)
{
- Topology topology = Top10StreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
inputTopic,
outputTopic,
storeSupplier);
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import lombok.*;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import lombok.AllArgsConstructor;
import lombok.Data;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
import java.time.Duration;
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
"juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.wordcount.top10.commit-interval=100",
"juplo.wordcount.top10.cacheMaxBytes=0",
- "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
- "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+ "juplo.wordcount.top10.input-topic=" + PopularApplicationIT.TOPIC_IN,
+ "juplo.wordcount.top10.output-topic=" + PopularApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { PopularApplicationIT.TOPIC_IN, PopularApplicationIT.TOPIC_OUT })
@Slf4j
-public class Top10ApplicationIT
+public class PopularApplicationIT
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
@Autowired
Consumer consumer;
@Autowired
- Top10StreamProcessor streamProcessor;
+ PopularStreamProcessor streamProcessor;
@BeforeAll
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
import java.util.Map;
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig;
@Slf4j
-public class Top10StreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
@BeforeEach
public void setUp()
{
- Topology topology = Top10StreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
Stores.inMemoryKeyValueStore(STORE_NAME));
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.popular;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;