-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
COPY target/*.jar /opt/app.jar
-EXPOSE 8080
+EXPOSE 8084
ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
CMD []
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.4</version>
+ <version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
- <version>1.0.0</version>
+ <version>1.0.3</version>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
- <java.version>11</java.version>
- <kafka.version>2.8.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
package de.juplo.kafka.wordcount.top10;
-import lombok.Value;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-@Value(staticConstructor = "of")
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Entry
{
- private final String word;
- private final Long count;
+ private String word;
+ private Long counter;
}
package de.juplo.kafka.wordcount.top10;
-import lombok.Getter;
-import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
-@Getter
-@Setter
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Key
{
- private String username;
+ private String user;
private String word;
}
package de.juplo.kafka.wordcount.top10;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-@Getter
-@Setter
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
public class Ranking
{
private Entry[] entries = new Entry[0];
- public void add(Entry newEntry)
+ public Ranking add(Entry newEntry)
{
if (entries.length == 0)
{
entries = new Entry[1];
entries[0] = newEntry;
- return;
+ return this;
}
List<Entry> list = new LinkedList<>(Arrays.asList(entries));
Entry entry;
entry = list.get(i);
- if (entry.getCount() <= newEntry.getCount())
+ if (entry.getCounter() <= newEntry.getCounter())
{
list.add(i, newEntry);
for (int j = i+1; j < list.size(); j++)
list = list.subList(0,10);
}
entries = list.toArray(num -> new Entry[num]);
- return;
+ return this;
}
}
+
+ return this;
+ }
+
+ public static Ranking of(Entry... entries)
+ {
+ return new Ranking(entries);
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
public class Top10Application
{
public static void main(String[] args)
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@Slf4j
+public class Top10ApplicationConfiguration
+{
+ @Bean
+ public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+ props.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "word:" + Key.class.getName() + "," +
+ "counter:" + Entry.class.getName());
+ props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
+ props.put(
+ JsonSerializer.TYPE_MAPPINGS,
+ "ranking:" + Ranking.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return props;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public Top10StreamProcessor streamProcessor(
+ Top10ApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
+ ConfigurableApplicationContext context)
+ {
+ Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+ return streamProcessor;
+ }
+}
package de.juplo.kafka.wordcount.top10;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.springframework.boot.SpringApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
+import org.apache.kafka.streams.Topology;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
-
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Slf4j
-@Component
public class Top10StreamProcessor
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
public final KafkaStreams streams;
public Top10StreamProcessor(
- Top10ApplicationProperties properties,
- ObjectMapper mapper,
- ConfigurableApplicationContext context)
+ String inputTopic,
+ String outputTopic,
+ Properties props)
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic);
+
+ streams = new KafkaStreams(topology, props);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic)
{
StreamsBuilder builder = new StreamsBuilder();
builder
- .<String, String>stream(properties.getInputTopic())
- .map((keyJson, countStr) ->
- {
- try
- {
- Key key = mapper.readValue(keyJson, Key.class);
- Long count = Long.parseLong(countStr);
- Entry entry = Entry.of(key.getWord(), count);
- String entryJson = mapper.writeValueAsString(entry);
- return new KeyValue<>(key.getUsername(), entryJson);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
+ .<Key, Entry>stream(inputTopic)
+ .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
.groupByKey()
.aggregate(
- () -> "{\"entries\" : []}",
- (username, entryJson, rankingJson) ->
- {
- try
- {
- Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
- ranking.add(mapper.readValue(entryJson, Entry.class));
- return mapper.writeValueAsString(ranking);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- }
- )
+ () -> new Ranking(),
+ (user, entry, ranking) -> ranking.add(entry))
.toStream()
- .to(properties.getOutputTopic());
+ .to(outputTopic);
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
- streams = new KafkaStreams(builder.build(), props);
- streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
+ return topology;
}
- @PostConstruct
public void start()
{
log.info("Starting Stream-Processor");
streams.start();
}
- @PreDestroy
public void stop()
{
log.info("Stopping Stream-Processor");
-
+server.port=8084
+management.endpoints.web.exposure.include=*
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+public class Counter
+{
+ String user;
+ String word;
+ long counter;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static void writeInputData(BiConsumer<Key, Counter> consumer)
+ {
+ consumer.accept(
+ Key.of("peter","Hallo"),
+ Counter.of("peter","Hallo",1));
+ consumer.accept(
+ Key.of("klaus","Müsch"),
+ Counter.of("klaus","Müsch",1));
+ consumer.accept(
+ Key.of("peter","Welt"),
+ Counter.of("peter","Welt",1));
+ consumer.accept(
+ Key.of("klaus","Müsch"),
+ Counter.of("klaus","Müsch",2));
+ consumer.accept(
+ Key.of("klaus","s"),
+ Counter.of("klaus","s",1));
+ consumer.accept(
+ Key.of("peter","Boäh"),
+ Counter.of("peter","Boäh",1));
+ consumer.accept(
+ Key.of("peter","Welt"),
+ Counter.of("peter","Welt",2));
+ consumer.accept(
+ Key.of("peter","Boäh"),
+ Counter.of("peter","Boäh",2));
+ consumer.accept(
+ Key.of("klaus","s"),
+ Counter.of("klaus","s",2));
+ consumer.accept(
+ Key.of("peter","Boäh"),
+ Counter.of("peter","Boäh",3));
+ consumer.accept(
+ Key.of("klaus","s"),
+ Counter.of("klaus","s",3));
+ }
+
+ static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
+ {
+ assertThat(receivedMessages).hasSize(11);
+ assertThat(receivedMessages).containsSubsequence(
+ expectedMessages[0],
+ expectedMessages[2],
+ expectedMessages[5],
+ expectedMessages[6],
+ expectedMessages[7]); // peter
+ assertThat(receivedMessages).containsSubsequence(
+ expectedMessages[1],
+ expectedMessages[3],
+ expectedMessages[4],
+ expectedMessages[8],
+ expectedMessages[10]); // klaus
+ }
+
+ static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ "peter",
+ Ranking.of(
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ "klaus",
+ Ranking.of(
+ Entry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ "peter",
+ Ranking.of(
+ Entry.of("Welt", 1l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 3
+ "klaus",
+ Ranking.of(
+ Entry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ "klaus",
+ Ranking.of(
+ Entry.of("Müsch", 2l),
+ Entry.of("s", 1l))),
+ KeyValue.pair( // 5
+ "peter",
+ Ranking.of(
+ Entry.of("Boäh", 1l),
+ Entry.of("Welt", 1l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 6
+ "peter",
+ Ranking.of(
+ Entry.of("Welt", 2l),
+ Entry.of("Boäh", 1l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 7
+ "peter",
+ Ranking.of(
+ Entry.of("Boäh", 2l),
+ Entry.of("Welt", 2l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ "klaus",
+ Ranking.of(
+ Entry.of("s", 2l),
+ Entry.of("Müsch", 2l))),
+ KeyValue.pair( // 9
+ "peter",
+ Ranking.of(
+ Entry.of("Boäh", 3l),
+ Entry.of("Welt", 2l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ "klaus",
+ Ranking.of(
+ Entry.of("s", 3l),
+ Entry.of("Müsch", 2l))),
+ };
+
+ static Map<String, Object> convertToMap(Properties properties)
+ {
+ return properties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+ }
+
+ static String parseHeader(Headers headers, String key)
+ {
+ Header header = headers.lastHeader(key);
+ if (header == null)
+ {
+ return key + "=null";
+ }
+ else
+ {
+ return key + "=" + new String(header.value());
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+ @Test
+ public void test()
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
+
+ Top10ApplicationConfiguration applicationConfiguriation =
+ new Top10ApplicationConfiguration();
+ Properties streamProcessorProperties =
+ applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ JsonSerde<?> keySerde = new JsonSerde<>();
+ keySerde.configure(propertyMap, true);
+ JsonSerde<?> valueSerde = new JsonSerde<>();
+ valueSerde.configure(propertyMap, false);
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+
+ TestInputTopic<Key, Entry> in = testDriver.createInputTopic(
+ IN,
+ (JsonSerializer<Key>)keySerde.serializer(),
+ (JsonSerializer<Entry>)valueSerde.serializer());
+
+ TestOutputTopic<String, Ranking> out = testDriver.createOutputTopic(
+ OUT,
+ (JsonDeserializer<String>)keySerde.deserializer(),
+ (JsonDeserializer<Ranking>)valueSerde.deserializer());
+
+ TestData.writeInputData((key, value) -> in.pipeInput(
+ key,
+ Entry.of(value.getWord(), value.getCounter())));
+
+ List<KeyValue<String, Ranking>> receivedMessages = out
+ .readRecordsToList()
+ .stream()
+ .map(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ return KeyValue.pair(record.key(), record.value());
+ })
+ .toList();
+
+ TestData.assertExpectedResult(receivedMessages);
+ }
+}