From: Kai Moritz Date: Sun, 20 Aug 2023 09:25:03 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-08-20~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2abd44c45ad59b50060f31852fcb4c2184137b3d;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 6b9669ff..694b67f4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -44,13 +45,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @MockBean KafkaServicesApplicationRunner kafkaServicesApplicationRunner; - @BeforeAll - public static void prepareChatRoomChannelConsumer( + public static void sendAndLoadStoredData(, + @Autowired KafkaTemplate messageTemplate, @Autowired Consumer chatRoomChannelConsumer, @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) + @Autowired ChatRoomChannel chatRoomChannel) { + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); chatRoomChannelConsumer.assign(assignedPartitions); chatRoomChannel.onPartitionsAssigned(assignedPartitions); @@ -63,27 +70,16 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT }); } - @BeforeAll - public static void sendStoredData( - @Autowired ShardingStrategy shardingStrategy, - @Autowired KafkaTemplate messageTemplate) - { - UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - int shard = shardingStrategy.selectShard(chatRoomId); - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - } - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) { ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); record.headers().add("__TypeId__", typeId.getBytes()); - kafkaTemplate - .send(record) - .thenAccept(result -> log.info("Sent {}={} to partition {}", key, value, result.getRecordMetadata().partition())); + SendResult result = kafkaTemplate.send(record).join(); + log.info( + "Sent {}={} to {}", + key, + value, + new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()))); } @AfterAll