-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
-public class QueryApplication
+public class PopularApplication
{
public static void main(String[] args)
{
- SpringApplication.run(QueryApplication.class, args);
+ SpringApplication.run(PopularApplication.class, args);
}
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.USER_STORE_NAME;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
+@EnableConfigurationProperties(PopularApplicationProperties.class)
@Slf4j
-public class QueryApplicationConfiguration
+public class PopularApplicationConfiguration
{
@Bean
public HostInfo applicationServer(
ServerProperties serverProperties,
- QueryApplicationProperties applicationProperties) throws IOException
+ PopularApplicationProperties applicationProperties) throws IOException
{
String host;
if (serverProperties.getAddress() == null)
@Bean
public Properties streamProcessorProperties(
- QueryApplicationProperties applicationProperties,
+ PopularApplicationProperties applicationProperties,
HostInfo applicationServer)
{
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
- "user:" + Key.class.getName() + "," +
- "ranking:" + Ranking.class.getName() + "," +
- "userranking:" + UserRanking.class.getName());
+ "word:" + Word.class.getName() + "," +
+ "ranking:" + Ranking.class.getName());
return props;
}
@Bean(initMethod = "start", destroyMethod = "stop")
- public QueryStreamProcessor streamProcessor(
+ public PopularStreamProcessor streamProcessor(
Properties streamProcessorProperties,
HostInfo applicationServer,
- QueryApplicationProperties applicationProperties,
+ PopularApplicationProperties applicationProperties,
KeyValueBytesStoreSupplier userStoreSupplier,
KeyValueBytesStoreSupplier rankingStoreSupplier,
ConfigurableApplicationContext context)
{
- QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+ PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
streamProcessorProperties,
applicationServer,
applicationProperties.getUsersInputTopic(),
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.Getter;
import org.springframework.boot.context.properties.ConfigurationProperties;
-@ConfigurationProperties("juplo.wordcount.query")
+@ConfigurationProperties("juplo.wordcount.popular")
@Getter
@Setter
@ToString
-public class QueryApplicationProperties
+public class PopularApplicationProperties
{
private String bootstrapServer = "localhost:9092";
- private String applicationId = "query";
- private String rankingInputTopic = "top10";
- private String usersInputTopic = "users";
+ private String applicationId = "popular";
+ private String inputTopic = "countings";
private Integer commitInterval;
private Integer cacheMaxBytes;
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@RestController
@RequiredArgsConstructor
-public class QueryController
+public class PopularController
{
- private final QueryStreamProcessor processor;
+ private final PopularStreamProcessor processor;
@GetMapping("{username}")
- ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+ ResponseEntity<Ranking> queryFor(@PathVariable String username)
{
Optional<URI> redirect = processor.getRedirect(username);
if (redirect.isPresent())
try
{
- return ResponseEntity.of(processor.getUserRanking(username));
+ return ResponseEntity.of(processor.getRanking(username));
}
catch (InvalidStateStoreException e)
{
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@Slf4j
-public class QueryStreamProcessor
+public class PopularStreamProcessor
{
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
public final KafkaStreams streams;
public final HostInfo hostInfo;
- public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
+ public final StoreQueryParameters<ReadOnlyKeyValueStore<String, Ranking>> storeParameters;
- public QueryStreamProcessor(
+ public PopularStreamProcessor(
Properties props,
HostInfo applicationServer,
String usersInputTopic,
rankingStoreSupplier);
streams = new KafkaStreams(topology, props);
hostInfo = applicationServer;
- storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
+ storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());
}
static Topology buildTopology(
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
- .<Key, Ranking>stream(rankingInputTopic)
- .map((key, value) -> new KeyValue<>(key.getUser(), value));
+ .<Word, Ranking>stream(rankingInputTopic)
+ .map((word, value) -> new KeyValue<>(word.getUser(), value));
rankings
.join(users, (ranking, user) -> UserRanking.of(
return topology;
}
- ReadOnlyKeyValueStore<String, UserRanking> getStore()
+ ReadOnlyKeyValueStore<String, Ranking> getStore()
{
return streams.store(storeParameters);
}
return Optional.of(location);
}
- public Optional<UserRanking> getUserRanking(String username)
+ public Optional<Ranking> getRanking(String username)
{
return Optional.ofNullable(getStore().get(username));
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.Data;
@Data
public class Ranking
{
- private Entry[] entries;
+ private WordCounter[] entries;
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.Data;
@Data
-public class Key
+public class Word
{
private String user;
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import lombok.Data;
@Data
-public class Entry
+public class WordCounter
{
private String word;
private Long count;
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.counter;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@Data
-public class TestUser
+public class TestWord
+
{
String user;
}
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.counter;
import lombok.AllArgsConstructor;
import lombok.Data;
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@Data
-public class TestEntry
+public class TestWordCounter
{
String word;
long count;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.USER_STORE_NAME;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
"juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.wordcount.query.commit-interval=100",
"juplo.wordcount.query.cache-max-bytes=0",
- "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
- "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+ "juplo.wordcount.query.users-input-topic=" + PopularApplicationIT.TOPIC_USERS,
+ "juplo.wordcount.query.ranking-input-topic=" + PopularApplicationIT.TOPIC_TOP10 })
@AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@EmbeddedKafka(topics = { PopularApplicationIT.TOPIC_TOP10, PopularApplicationIT.TOPIC_USERS})
@Slf4j
-public class QueryApplicationIT
+public class PopularApplicationIT
{
public static final String TOPIC_TOP10 = "top10";
public static final String TOPIC_USERS = "users";
@Autowired
ObjectMapper objectMapper;
@Autowired
- QueryStreamProcessor streamProcessor;
+ PopularStreamProcessor streamProcessor;
@BeforeAll
Map<String, Object> properties = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+ JsonSerializer.TYPE_MAPPINGS, "user:" + TestWord.class.getName() + ",ranking:" + TestRanking.class.getName());
return new KafkaTemplate(producerFactory, properties);
}
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguration.serializationConfig;
@Slf4j
-public class QueryStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
{
public static final String TOP10_IN = "TOP10-IN";
public static final String USERS_IN = "USERS-IN";
TopologyTestDriver testDriver;
- TestInputTopic<TestUser, TestRanking> top10In;
+ TestInputTopic<TestWord, TestRanking> top10In;
TestInputTopic<String, TestUserData> userIn;
@BeforeEach
public void setUp()
{
- Topology topology = QueryStreamProcessor.buildTopology(
+ Topology topology = PopularStreamProcessor.buildTopology(
USERS_IN,
TOP10_IN,
Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
top10In = testDriver.createInputTopic(
TOP10_IN,
- jsonSerializer(TestUser.class, true),
+ jsonSerializer(TestWord.class, true),
jsonSerializer(TestRanking.class,false));
userIn = testDriver.createInputTopic(
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
- "user:" + TestUser.class.getName() + "," +
+ "user:" + TestWord.class.getName() + "," +
"ranking:" + TestRanking.class.getName()),
isKey);
return jsonSerializer;
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.popular;
-import de.juplo.kafka.wordcount.top10.TestEntry;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.counter.TestWordCounter;
+import de.juplo.kafka.wordcount.counter.TestRanking;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
class TestData
{
- static final TestUser PETER = TestUser.of("peter");
- static final TestUser KLAUS = TestUser.of("klaus");
+ static final TestWord PETER = TestWord.of("peter");
+ static final TestWord KLAUS = TestWord.of("klaus");
- static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
+ static final Stream<KeyValue<TestWord, TestRanking>> getTop10Messages()
{
return Stream.of(TOP10_MESSAGES);
}
.map(kv -> kv.value)
.reduce(null, (left, right) -> right);
- Entry[] entries = Arrays
+ WordCounter[] entries = Arrays
.stream(testRanking.getEntries())
.map(testEntry -> entryOf(testEntry))
- .toArray(size -> new Entry[size]);
+ .toArray(size -> new WordCounter[size]);
return UserRanking.of(
testUserData.getFirstName(),
entries);
}
- private static Entry entryOf(TestEntry testEntry)
+ private static WordCounter entryOf(TestWordCounter testEntry)
{
- Entry entry = new Entry();
- entry.setWord(testEntry.getWord());
- entry.setCount(testEntry.getCount());
- return entry;
+ WordCounter wordCounter = new WordCounter();
+ wordCounter.setWord(testEntry.getWord());
+ wordCounter.setCount(testEntry.getCount());
+ return wordCounter;
}
- private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ private static KeyValue<TestWord, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
TestRanking.of(
- TestEntry.of("Hallo", 1l))),
+ TestWordCounter.of("Hallo", 1l))),
KeyValue.pair( // 1
KLAUS,
TestRanking.of(
- TestEntry.of("Müsch", 1l))),
+ TestWordCounter.of("Müsch", 1l))),
KeyValue.pair( // 2
PETER,
TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l))),
+ TestWordCounter.of("Hallo", 1l),
+ TestWordCounter.of("Welt", 1l))),
KeyValue.pair( // 3
KLAUS,
TestRanking.of(
- TestEntry.of("Müsch", 2l))),
+ TestWordCounter.of("Müsch", 2l))),
KeyValue.pair( // 4
KLAUS,
TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 1l))),
+ TestWordCounter.of("Müsch", 2l),
+ TestWordCounter.of("s", 1l))),
KeyValue.pair( // 5
PETER,
TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l),
- TestEntry.of("Boäh", 1l))),
+ TestWordCounter.of("Hallo", 1l),
+ TestWordCounter.of("Welt", 1l),
+ TestWordCounter.of("Boäh", 1l))),
KeyValue.pair( // 6
PETER,
TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Boäh", 1l))),
+ TestWordCounter.of("Welt", 2l),
+ TestWordCounter.of("Hallo", 1l),
+ TestWordCounter.of("Boäh", 1l))),
KeyValue.pair( // 7
PETER,
TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Boäh", 2l),
- TestEntry.of("Hallo", 1l))),
+ TestWordCounter.of("Welt", 2l),
+ TestWordCounter.of("Boäh", 2l),
+ TestWordCounter.of("Hallo", 1l))),
KeyValue.pair( // 8
KLAUS,
TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 2l))),
+ TestWordCounter.of("Müsch", 2l),
+ TestWordCounter.of("s", 2l))),
KeyValue.pair( // 9
PETER,
TestRanking.of(
- TestEntry.of("Boäh", 3l),
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l))),
+ TestWordCounter.of("Boäh", 3l),
+ TestWordCounter.of("Welt", 2l),
+ TestWordCounter.of("Hallo", 1l))),
KeyValue.pair( // 10
KLAUS,
TestRanking.of(
- TestEntry.of("s", 3l),
- TestEntry.of("Müsch", 2l))),
+ TestWordCounter.of("s", 3l),
+ TestWordCounter.of("Müsch", 2l))),
};
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]