<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>query</artifactId>
- <version>2.1.2</version>
- <name>Wordcount-Query</name>
- <description>Query stream-processor of the multi-user wordcount-example</description>
+ <artifactId>stats</artifactId>
+ <version>1.0.0</version>
+ <name>Wordcount-Statistics</name>
+ <description>Statistics stream-processor of the multi-user wordcount-example</description>
<properties>
<java.version>21</java.version>
<docker-maven-plugin.version>0.44.0</docker-maven-plugin.version>
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.Data;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.Data;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.Data;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class QueryApplication
+public class StatsApplication
{
public static void main(String[] args)
{
- SpringApplication.run(QueryApplication.class, args);
+ SpringApplication.run(StatsApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.USER_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
+@EnableConfigurationProperties(StatsApplicationProperties.class)
@Slf4j
-public class QueryApplicationConfiguration
+public class StatsApplicationConfiguration
{
@Bean
public HostInfo applicationServer(
ServerProperties serverProperties,
- QueryApplicationProperties applicationProperties) throws IOException
+ StatsApplicationProperties applicationProperties) throws IOException
{
String host;
if (serverProperties.getAddress() == null)
@Bean
public Properties streamProcessorProperties(
- QueryApplicationProperties applicationProperties,
+ StatsApplicationProperties applicationProperties,
HostInfo applicationServer)
{
Properties props = new Properties();
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public QueryStreamProcessor streamProcessor(
+ public StatsStreamProcessor streamProcessor(
Properties streamProcessorProperties,
HostInfo applicationServer,
- QueryApplicationProperties applicationProperties,
+ StatsApplicationProperties applicationProperties,
KeyValueBytesStoreSupplier userStoreSupplier,
KeyValueBytesStoreSupplier rankingStoreSupplier,
ConfigurableApplicationContext context)
{
- QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+ StatsStreamProcessor streamProcessor = new StatsStreamProcessor(
streamProcessorProperties,
applicationServer,
applicationProperties.getUsersInputTopic(),
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("juplo.wordcount.query")
+@ConfigurationProperties("juplo.wordcount.stats")
@Getter
@Setter
@ToString
-public class QueryApplicationProperties
+public class StatsApplicationProperties
{
private String bootstrapServer = "localhost:9092";
- private String applicationId = "query";
+ private String applicationId = "stats";
private String rankingInputTopic = "top10";
private String usersInputTopic = "users";
private Integer commitInterval;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@RestController
@RequiredArgsConstructor
-public class QueryController
+public class StatsController
{
- private final QueryStreamProcessor processor;
+ private final StatsStreamProcessor processor;
@GetMapping("{username}")
ResponseEntity<UserRanking> queryFor(@PathVariable String username)
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@Slf4j
-public class QueryStreamProcessor
+public class StatsStreamProcessor
{
- public static final String STATS_TYPE = "COUNTER";
+ public static final String STATS_TYPE = "POPULAR";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
- public QueryStreamProcessor(
+ public StatsStreamProcessor(
Properties props,
HostInfo applicationServer,
String usersInputTopic,
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.Data;
import lombok.EqualsAndHashCode;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import lombok.*;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.USER_STORE_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
"logging.level.org.apache.kafka.streams=INFO",
- "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.query.commit-interval=100",
- "juplo.wordcount.query.cache-max-bytes=0",
- "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
- "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+ "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.stats.commit-interval=100",
+ "juplo.wordcount.stats.cache-max-bytes=0",
+ "juplo.wordcount.stats.users-input-topic=" + StatsApplicationIT.TOPIC_USERS,
+ "juplo.wordcount.stats.ranking-input-topic=" + StatsApplicationIT.TOPIC_TOP10 })
@AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_TOP10, StatsApplicationIT.TOPIC_USERS})
@Slf4j
-public class QueryApplicationIT
+public class StatsApplicationIT
{
public static final String TOPIC_TOP10 = "top10";
public static final String TOPIC_USERS = "users";
@Autowired
ObjectMapper objectMapper;
@Autowired
- QueryStreamProcessor streamProcessor;
+ StatsStreamProcessor streamProcessor;
@BeforeAll
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.top10.TestUser;
import java.util.Map;
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig;
@Slf4j
-public class QueryStreamProcessorTopologyTest
+public class StatsStreamProcessorTopologyTest
{
public static final String TOP10_IN = "TOP10-IN";
public static final String USERS_IN = "USERS-IN";
@BeforeEach
public void setUp()
{
- Topology topology = QueryStreamProcessor.buildTopology(
+ Topology topology = StatsStreamProcessor.buildTopology(
USERS_IN,
TOP10_IN,
Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
import java.util.function.Function;
import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STATS_TYPE;
import static org.assertj.core.api.Assertions.assertThat;