package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
private final String instanceUri;
private boolean running;
+ @Getter
+ private volatile boolean loadInProgress = true;
public InfoChannel(
}
- boolean loadInProgress()
- {
- return IntStream
- .range(0, numShards)
- .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
- }
-
Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
UUID chatRoomId,
String name,
});
}
- Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+ void sendShardAssignedEvent(int shard)
{
EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- Integer.toString(shard),
- to);
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
- producer.send(record, ((metadata, exception) ->
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
{
- if (metadata != null)
- {
- log.info("Successfully sent shard assigned event for shard: {}", shard);
- sink.success(metadata);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send shard assigned event for shard {}: {}",
- shard,
- exception);
- sink.error(exception);
- }
- }));
- });
+ log.info("Successfully sent shard assigned event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard assigned event for shard {}: {}",
+ shard,
+ exception);
+ // TODO:
+ // Verhalten im Fehlerfall durchdenken!
+ // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
+ // Consumers veranlassen, so dass die nicht öffentlich Bekannte
+ // Zuständigkeit abgegeben und neu zugeordnet wird?
+ // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
+ }
+ }));
}
- Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+ void sendShardRevokedEvent(int shard)
{
EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
- return Mono.create(sink ->
- {
- ProducerRecord<String, AbstractMessageTo> record =
- new ProducerRecord<>(
- topic,
- Integer.toString(shard),
- to);
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
- producer.send(record, ((metadata, exception) ->
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
{
- if (metadata != null)
- {
- log.info("Successfully sent shard revoked event for shard: {}", shard);
- sink.success(metadata);
- }
- else
- {
- // On send-failure
- log.error(
- "Could not send shard revoked event for shard {}: {}",
- shard,
- exception);
- sink.error(exception);
- }
- }));
- });
+ log.info("Successfully sent shard revoked event for shard: {}", shard);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard revoked event for shard {}: {}",
+ shard,
+ exception);
+ // TODO:
+ // Verhalten im Fehlerfall durchdenken!
+ // Ggf. einfach egal, da die neue zuständige Instanz den
+ // nicht gelöschten Eintrag eh überschreibt?
+ }
+ }));
}
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = 0l);
+ loadInProgress = true;
while (running)
{
{
ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
log.debug("Fetched {} messages", records.count());
- handleMessages(records);
+ for (ConsumerRecord<String, AbstractMessageTo> record : records)
+ {
+ handleMessage(record);
+ updateNextOffset(record.partition(), record.offset() + 1);
+ }
}
catch (WakeupException e)
{
log.info("Exiting normally");
}
- private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
+ private void updateNextOffset(int partition, long nextOffset)
{
- for (ConsumerRecord<String, AbstractMessageTo> record : records)
- {
- switch (record.value().getType())
- {
- case EVENT_CHATROOM_CREATED:
- EventChatRoomCreated eventChatRoomCreated =
- (EventChatRoomCreated) record.value();
- createChatRoom(eventChatRoomCreated.toChatRoomInfo());
- break;
-
- case EVENT_SHARD_ASSIGNED:
- EventShardAssigned eventShardAssigned =
- (EventShardAssigned) record.value();
- log.info(
- "Shard {} was assigned to {}",
- eventShardAssigned.getShard(),
- eventShardAssigned.getUri());
- shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
- break;
-
- case EVENT_SHARD_REVOKED:
- EventShardRevoked eventShardRevoked =
- (EventShardRevoked) record.value();
- log.info(
- "Shard {} was revoked from {}",
- eventShardRevoked.getShard(),
- eventShardRevoked.getUri());
- shardOwners[eventShardRevoked.getShard()] = null;
- break;
-
- default:
- log.debug(
- "Ignoring message for key={} with offset={}: {}",
- record.key(),
- record.offset(),
- record.value());
- }
+ this.nextOffset[partition] = nextOffset;
+ if (loadInProgress) {
+ loadInProgress = IntStream
+ .range(0, numShards)
+ .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ }
+ }
- nextOffset[record.partition()] = record.offset() + 1;
+ private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
+ {
+ switch (record.value().getType())
+ {
+ case EVENT_CHATROOM_CREATED:
+ EventChatRoomCreated eventChatRoomCreated =
+ (EventChatRoomCreated) record.value();
+ createChatRoom(eventChatRoomCreated.toChatRoomInfo());
+ break;
+
+ case EVENT_SHARD_ASSIGNED:
+ EventShardAssigned eventShardAssigned =
+ (EventShardAssigned) record.value();
+ log.info(
+ "Shard {} was assigned to {}",
+ eventShardAssigned.getShard(),
+ eventShardAssigned.getUri());
+ shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+ break;
+
+ case EVENT_SHARD_REVOKED:
+ EventShardRevoked eventShardRevoked =
+ (EventShardRevoked) record.value();
+ log.info(
+ "Shard {} was revoked from {}",
+ eventShardRevoked.getShard(),
+ eventShardRevoked.getUri());
+ shardOwners[eventShardRevoked.getShard()] = null;
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for key={} with offset={}: {}",
+ record.key(),
+ record.offset(),
+ record.value());
}
}
Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
return Mono.fromSupplier(() -> chatRoomInfo.get(id));
}