From: Kai Moritz <kai@juplo.de>
Date: Tue, 14 May 2024 21:02:55 +0000 (+0200)
Subject: top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- COPY
X-Git-Tag: top10-1.0.3~1
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=e92f87e3cb4ae59bb0cadd178222f4424038a922;p=demos%2Fkafka%2Fwordcount

top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- COPY
---

diff --git a/pom.xml b/pom.xml
index b46a871..cb7ec14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
 	</parent>
 	<groupId>de.juplo.kafka.wordcount</groupId>
 	<artifactId>top10</artifactId>
-	<version>1.0.2</version>
+	<version>1.0.3</version>
 	<name>Wordcount-Top-10</name>
 	<description>Top-10 stream-processor of the multi-user wordcount-example</description>
 	<properties>
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
new file mode 100644
index 0000000..cdf268f
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
@@ -0,0 +1,111 @@
+package de.juplo.kafka.wordcount.top10;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Pattern;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class Top10ApplicationConfiguration
+{
+	final static Pattern PATTERN = Pattern.compile("\\W+");
+
+	public final KafkaStreams streams;
+
+
+	public Top10ApplicationConfiguration(
+			Top10ApplicationProperties properties,
+			ObjectMapper mapper,
+			ConfigurableApplicationContext context)
+	{
+		StreamsBuilder builder = new StreamsBuilder();
+
+		builder
+				.<String, String>stream(properties.getInputTopic())
+				.map((keyJson, countStr) ->
+				{
+					try
+					{
+						Key key = mapper.readValue(keyJson, Key.class);
+						Long count = Long.parseLong(countStr);
+						Entry entry = Entry.of(key.getWord(), count);
+						String entryJson = mapper.writeValueAsString(entry);
+						return new KeyValue<>(key.getUsername(), entryJson);
+					}
+					catch (JsonProcessingException e)
+					{
+						throw new RuntimeException(e);
+					}
+				})
+				.groupByKey()
+				.aggregate(
+						() -> "{\"entries\"	: []}",
+						(username, entryJson, rankingJson) ->
+						{
+							try
+							{
+								Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+								ranking.add(mapper.readValue(entryJson, Entry.class));
+								return mapper.writeValueAsString(ranking);
+							}
+							catch (JsonProcessingException e)
+							{
+								throw new RuntimeException(e);
+							}
+						}
+				)
+				.toStream()
+				.to(properties.getOutputTopic());
+
+		Properties props = new Properties();
+		props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+		streams = new KafkaStreams(builder.build(), props);
+		streams.setUncaughtExceptionHandler((Throwable e) ->
+		{
+			log.error("Unexpected error!", e);
+			CompletableFuture.runAsync(() ->
+			{
+				log.info("Stopping application...");
+				SpringApplication.exit(context, () -> 1);
+			});
+			return SHUTDOWN_CLIENT;
+		});
+	}
+
+	@PostConstruct
+	public void start()
+	{
+		log.info("Starting Stream-Processor");
+		streams.start();
+	}
+
+	@PreDestroy
+	public void stop()
+	{
+		log.info("Stopping Stream-Processor");
+		streams.close();
+	}
+}