X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=3fe15c44a61410f17a8b2723c92e77fb2e104cce;hb=016799a742995f203ec29ab77210763a8a9c377b;hp=2b986296db80e8062f01896c8e2d6e7853e8fb40;hpb=6045ac97a24bef487d0ba09d02a5dc49c0a25af4;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2b986296..3fe15c44 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -68,7 +68,7 @@ public class InfoChannel implements Runnable } - boolean loadInProgress() + boolean isLoadInProgress() { return IntStream .range(0, numShards) @@ -111,68 +111,68 @@ public class InfoChannel implements Runnable }); } - Mono sendShardAssignedEvent(int shard) + void sendShardAssignedEvent(int shard) { EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); - producer.send(record, ((metadata, exception) -> + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard assigned event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard assigned event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard assigned event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + // TODO: + // Verhalten im Fehlerfall durchdenken! + // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des + // Consumers veranlassen, so dass die nicht öffentlich Bekannte + // Zuständigkeit abgegeben und neu zugeordnet wird? + // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions! + } + })); } - Mono sendShardRevokedEvent(int shard) + void sendShardRevokedEvent(int shard) { EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - Integer.toString(shard), - to); + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); - producer.send(record, ((metadata, exception) -> + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) { - if (metadata != null) - { - log.info("Successfully sent shard revoked event for shard: {}", shard); - sink.success(metadata); - } - else - { - // On send-failure - log.error( - "Could not send shard revoked event for shard {}: {}", - shard, - exception); - sink.error(exception); - } - })); - }); + log.info("Successfully sent shard revoked event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + // TODO: + // Verhalten im Fehlerfall durchdenken! + // Ggf. einfach egal, da die neue zuständige Instanz den + // nicht gelöschten Eintrag eh überschreibt? + } + })); } @@ -196,7 +196,11 @@ public class InfoChannel implements Runnable { ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.debug("Fetched {} messages", records.count()); - handleMessages(records); + for (ConsumerRecord record : records) + { + handleMessage(record); + updateNextOffset(record.partition(), record.offset() + 1); + } } catch (WakeupException e) { @@ -208,47 +212,47 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void handleMessages(ConsumerRecords records) + private void updateNextOffset(int partition, long nextOffset) { - for (ConsumerRecord record : records) - { - switch (record.value().getType()) - { - case EVENT_CHATROOM_CREATED: - EventChatRoomCreated eventChatRoomCreated = - (EventChatRoomCreated) record.value(); - createChatRoom(eventChatRoomCreated.toChatRoomInfo()); - break; - - case EVENT_SHARD_ASSIGNED: - EventShardAssigned eventShardAssigned = - (EventShardAssigned) record.value(); - log.info( - "Shard {} was assigned to {}", - eventShardAssigned.getShard(), - eventShardAssigned.getUri()); - shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); - break; - - case EVENT_SHARD_REVOKED: - EventShardRevoked eventShardRevoked = - (EventShardRevoked) record.value(); - log.info( - "Shard {} was revoked from {}", - eventShardRevoked.getShard(), - eventShardRevoked.getUri()); - shardOwners[eventShardRevoked.getShard()] = null; - break; - - default: - log.debug( - "Ignoring message for key={} with offset={}: {}", - record.key(), - record.offset(), - record.value()); - } + this.nextOffset[partition] = nextOffset; + } - nextOffset[record.partition()] = record.offset() + 1; + private void handleMessage(ConsumerRecord record) + { + switch (record.value().getType()) + { + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); + break; + + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; + break; + + default: + log.debug( + "Ignoring message for key={} with offset={}: {}", + record.key(), + record.offset(), + record.value()); } }