private final Consumer<String, AbstractMessageTo> consumer;
private final int numShards;
private final long[] startOffset;
- private final long[] currentOffset;
+ private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final DataChannel dataChannel;
.partitionsFor(topic)
.size();
this.startOffset = new long[numShards];
- this.currentOffset = new long[numShards];
+ this.nextOffset = new long[numShards];
IntStream
.range(0, numShards)
- .forEach(partition -> this.currentOffset[partition] = -1l);
- consumer
- .endOffsets(consumer.assignment())
- .entrySet()
- .stream()
- .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+ .forEach(partition -> this.nextOffset[partition] = -1l);
this.dataChannel = dataChannel;
}
{
return IntStream
.range(0, numShards)
- .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
+ .anyMatch(partition -> nextOffset[partition] < startOffset[partition]);
}
Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
{
running = true;
+ consumer
+ .endOffsets(consumer.assignment())
+ .entrySet()
+ .stream()
+ .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+ IntStream
+ .range(0, numShards)
+ .forEach(partition -> this.nextOffset[partition] = 0l);
+
while (running)
{
try
record.value());
}
- startOffset[record.partition()] = record.offset() + 1;
+ nextOffset[record.partition()] = record.offset() + 1;
}
}
@BeforeAll
public static void sendAndLoadStoredData(@Autowired KafkaTemplate<String, String> messageTemplate)
{
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+ send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
}
- static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ static void send(
+ KafkaTemplate<String, String> kafkaTemplate,
+ String topic,
+ String key,
+ String value,
+ String typeId)
{
- ProducerRecord<String, String> record = new ProducerRecord<>(DATA_TOPIC, key, value);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
record.headers().add("__TypeId__", typeId.getBytes());
SendResult<String, String> result = kafkaTemplate.send(record).join();
log.info(