X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=a6351d0ff52354241c57bf63b3a68c9956682d0a;hb=13f86063f851fc2c4ad6de56c8edb78bff9d0592;hp=3429d405245652e81ba5ab4e6fa03b7d31882c57;hpb=b57c6c7d41ba5bd3ff095450798010aa45751462;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 3429d405..a6351d0f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,10 +1,12 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -30,6 +32,7 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; private final String[] shardOwners; private final long[] currentOffset; @@ -38,12 +41,16 @@ public class InfoChannel implements Runnable private final String instanceUri; private boolean running; + @Getter + private volatile boolean loadInProgress = true; public InfoChannel( String topic, Producer producer, Consumer infoChannelConsumer, + Duration pollingInterval, + int numShards, URI instanceUri) { log.debug( @@ -54,9 +61,9 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); - this.numShards = consumer - .partitionsFor(topic) - .size(); + this.pollingInterval = pollingInterval; + + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; @@ -68,13 +75,6 @@ public class InfoChannel implements Runnable } - boolean isLoadInProgress() - { - return IntStream - .range(0, numShards) - .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); - } - Mono sendChatRoomCreatedEvent( UUID chatRoomId, String name, @@ -189,14 +189,19 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); + loadInProgress = true; while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); - handleMessages(records); + for (ConsumerRecord record : records) + { + handleMessage(record); + updateNextOffset(record.partition(), record.offset() + 1); + } } catch (WakeupException e) { @@ -208,47 +213,52 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void handleMessages(ConsumerRecords records) + private void updateNextOffset(int partition, long nextOffset) { - for (ConsumerRecord record : records) - { - switch (record.value().getType()) - { - case EVENT_CHATROOM_CREATED: - EventChatRoomCreated eventChatRoomCreated = - (EventChatRoomCreated) record.value(); - createChatRoom(eventChatRoomCreated.toChatRoomInfo()); - break; - - case EVENT_SHARD_ASSIGNED: - EventShardAssigned eventShardAssigned = - (EventShardAssigned) record.value(); - log.info( - "Shard {} was assigned to {}", - eventShardAssigned.getShard(), - eventShardAssigned.getUri()); - shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); - break; - - case EVENT_SHARD_REVOKED: - EventShardRevoked eventShardRevoked = - (EventShardRevoked) record.value(); - log.info( - "Shard {} was revoked from {}", - eventShardRevoked.getShard(), - eventShardRevoked.getUri()); - shardOwners[eventShardRevoked.getShard()] = null; - break; - - default: - log.debug( - "Ignoring message for key={} with offset={}: {}", - record.key(), - record.offset(), - record.value()); - } + this.nextOffset[partition] = nextOffset; + if (loadInProgress) { + loadInProgress = IntStream + .range(0, numShards) + .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]); + } + } - nextOffset[record.partition()] = record.offset() + 1; + private void handleMessage(ConsumerRecord record) + { + switch (record.value().getType()) + { + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); + break; + + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; + break; + + default: + log.debug( + "Ignoring message for key={} with offset={}: {}", + record.key(), + record.offset(), + record.value()); } } @@ -282,6 +292,11 @@ public class InfoChannel implements Runnable Mono getChatRoomInfo(UUID id) { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); }