* Adapted the app to the new type-mapping `stats` for the incomming keys.
* Refined the class `Key`, that defines the JSON for the incomming key.
** Renamed attribute `user` to `channel`.
** Added attribute `type` of type `String`.
* Refined the class `Entry`, that defines the JSON of an entry in the
ranking, that is defined in the class `Ranking`.
** Renamed attribute `word` to `key`.
* The `QueryStreamProcessor` filters the incomming messages by the field
`type` of the `Key`: all messages are dropped, that are not of type
`COUNTER`.
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>query</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>query</artifactId>
- <version>2.0.1</version>
+ <version>2.1.0</version>
<name>Wordcount-Query</name>
<description>Query stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Query</name>
<description>Query stream-processor of the multi-user wordcount-example</description>
<properties>
@Data
public class Entry
{
@Data
public class Entry
{
+ private String type;
+ private String channel;
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
- "user:" + Key.class.getName() + "," +
+ "stats:" + Key.class.getName() + "," +
"ranking:" + Ranking.class.getName() + "," +
"userranking:" + UserRanking.class.getName());
"ranking:" + Ranking.class.getName() + "," +
"userranking:" + UserRanking.class.getName());
@Slf4j
public class QueryStreamProcessor
{
@Slf4j
public class QueryStreamProcessor
{
+ public static final String STATS_TYPE = "COUNTER";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
.<Key, Ranking>stream(rankingInputTopic)
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
.<Key, Ranking>stream(rankingInputTopic)
- .map((key, value) -> new KeyValue<>(key.getUser(), value));
+ .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+ .map((key, value) -> new KeyValue<>(key.getChannel(), value));
rankings
.join(users, (ranking, user) -> UserRanking.of(
rankings
.join(users, (ranking, user) -> UserRanking.of(
Map<String, Object> properties = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
Map<String, Object> properties = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+ JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
return new KafkaTemplate(producerFactory, properties);
}
return new KafkaTemplate(producerFactory, properties);
}
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
- "user:" + TestUser.class.getName() + "," +
+ "stats:" + TestUser.class.getName() + "," +
"ranking:" + TestRanking.class.getName()),
isKey);
return jsonSerializer;
"ranking:" + TestRanking.class.getName()),
isKey);
return jsonSerializer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.function.Function;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final TestUser PETER = TestUser.of("peter");
- static final TestUser KLAUS = TestUser.of("klaus");
+ static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
+ static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{
static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{
static void assertExpectedState(Function<String, UserRanking> function)
{
static void assertExpectedState(Function<String, UserRanking> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser()));
- assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser()));
+ assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
+ assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
}
private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
- .filter(kv -> kv.key.getUser().equals(user))
+ .filter(kv -> kv.key.getChannel().equals(user))
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
private static Entry entryOf(TestEntry testEntry)
{
Entry entry = new Entry();
private static Entry entryOf(TestEntry testEntry)
{
Entry entry = new Entry();
- entry.setWord(testEntry.getWord());
+ entry.setKey(testEntry.getKey());
entry.setCounter(testEntry.getCounter());
return entry;
}
entry.setCounter(testEntry.getCounter());
return entry;
}
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- PETER.getUser(),
- TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)),
+ PETER.getChannel(),
+ TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
- KLAUS.getUser(),
- TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ KLAUS.getChannel(),
+ TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
@Data
public class TestEntry
{
@Data
public class TestEntry
{
@Data
public class TestUser
{
@Data
public class TestUser
{
+ String type;
+ String channel;