package de.juplo.kafka.chat.backend;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import jakarta.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
@Autowired
ChatBackendProperties properties;
@Autowired
- ChatHome chatHome;
+ ChatHomeService chatHomeService;
@Autowired
StorageStrategy storageStrategy;
@PreDestroy
public void onExit()
{
- storageStrategy.write(chatHome);
+ storageStrategy.write(chatHomeService);
}
public static void main(String[] args)
package de.juplo.kafka.chat.backend.api;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class ChatBackendController
{
- private final ChatHome chatHome;
+ private final ChatHomeService chatHomeService;
private final StorageStrategy storageStrategy;
public Mono<ChatRoomInfoTo> create(@RequestBody String name)
{
UUID chatRoomId = UUID.randomUUID();
- return chatHome
+ return chatHomeService
.createChatRoom(chatRoomId, name)
.map(ChatRoomInfoTo::from);
}
@GetMapping("list")
public Flux<ChatRoomInfoTo> list()
{
- return chatHome
+ return chatHomeService
.getChatRoomInfo()
.map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo));
}
@GetMapping("{chatRoomId}/list")
public Flux<MessageTo> list(@PathVariable UUID chatRoomId)
{
- return chatHome
+ return chatHomeService
.getChatRoomData(chatRoomId)
.flatMapMany(chatRoomData -> chatRoomData
.getMessages()
@GetMapping("{chatRoomId}")
public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatRoomId)
{
- return chatHome
+ return chatHomeService
.getChatRoomInfo(chatRoomId)
.map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo));
}
@RequestBody String text)
{
return
- chatHome
+ chatHomeService
.getChatRoomData(chatRoomId)
.flatMap(chatRoomData -> put(chatRoomData, username, messageId, text));
}
@PathVariable Long messageId)
{
return
- chatHome
+ chatHomeService
.getChatRoomData(chatRoomId)
.flatMap(chatRoomData -> get(chatRoomData, username, messageId));
}
@GetMapping(path = "{chatRoomId}/listen")
public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatRoomId)
{
- return chatHome
+ return chatHomeService
.getChatRoomData(chatRoomId)
.flatMapMany(chatRoomData -> listen(chatRoomData));
}
@PostMapping("/store")
public void store()
{
- storageStrategy.write(chatHome);
+ storageStrategy.write(chatHomeService);
}
}
import java.util.UUID;
-public interface ChatHome
+public interface ChatHomeService
{
Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- default void write(ChatHome chatHome)
+ default void write(ChatHomeService chatHomeService)
{
writeChatRoomInfo(
- chatHome
+ chatHomeService
.getChatRoomInfo()
.doOnNext(chatRoomInfo ->
writeChatRoomData(
chatRoomInfo.getId(),
- chatHome
+ chatHomeService
.getChatRoomData(chatRoomInfo.getId())
.flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
name = "sharding-strategy",
havingValue = "none",
matchIfMissing = true)
- ChatHome noneShardingChatHome(
+ ChatHomeService noneShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
{
- return new SimpleChatHome(
+ return new SimpleChatHomeService(
storageStrategy,
clock,
properties.getChatroomBufferSize());
prefix = "chat.backend.inmemory",
name = "sharding-strategy",
havingValue = "kafkalike")
- ChatHome kafkalikeShardingChatHome(
+ ChatHomeService kafkalikeShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
{
int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
IntStream
.of(properties.getInmemory().getOwnedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
shard,
storageStrategy,
clock,
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new ShardedChatHomeService(chatHomes, strategy);
}
@ConditionalOnProperty(
@Slf4j
-public class ShardedChatHome implements ChatHome
+public class ShardedChatHomeService implements ChatHomeService
{
- private final SimpleChatHome[] chatHomes;
+ private final SimpleChatHomeService[] chatHomes;
private final Set<Integer> ownedShards;
private final ShardingStrategy shardingStrategy;
- public ShardedChatHome(
- SimpleChatHome[] chatHomes,
+ public ShardedChatHomeService(
+ SimpleChatHomeService[] chatHomes,
ShardingStrategy shardingStrategy)
{
this.chatHomes = chatHomes;
@Slf4j
-public class SimpleChatHome implements ChatHome
+public class SimpleChatHomeService implements ChatHomeService
{
private final Integer shard;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
- public SimpleChatHome(
+ public SimpleChatHomeService(
StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
bufferSize);
}
- public SimpleChatHome(
+ public SimpleChatHomeService(
Integer shard,
StorageStrategy storageStrategy,
Clock clock,
package de.juplo.kafka.chat.backend.persistence.kafka;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
@RequiredArgsConstructor
@Slf4j
-public class KafkaChatHome implements ChatHome
+public class KafkaChatHomeService implements ChatHomeService
{
private final int numPartitions;
private final ChatRoomChannel chatRoomChannel;
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
public class KafkaServicesConfiguration
{
@Bean
- ChatHome kafkaChatHome(
+ ChatHomeService kafkaChatHome(
ChatBackendProperties properties,
ChatRoomChannel chatRoomChannel)
{
- return new KafkaChatHome(
+ return new KafkaChatHomeService(
properties.getKafka().getNumPartitions(),
chatRoomChannel);
}
ChatBackendProperties properties;
@MockBean
- ChatHome chatHome;
+ ChatHomeService chatHomeService;
@MockBean
ChatRoomService chatRoomService;
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
Clock.systemDefaultZone(),
chatRoomService,
8);
- when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
Clock.systemDefaultZone(),
chatRoomService,
8);
- when(chatHome.getChatRoomData(any(UUID.class)))
+ when(chatHomeService.getChatRoomData(any(UUID.class)))
.thenReturn(Mono.just(chatRoomData));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHomeService.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
@ExtendWith(SpringExtension.class)
-public abstract class ChatHomeTest
+public abstract class ChatHomeServiceTest
{
@Autowired
- ChatHome chatHome;
+ ChatHomeService chatHomeService;
@Test
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
.log("testGetExistingChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
.log("testGetNonExistentChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-public abstract class ChatHomeWithShardsTest extends ChatHomeTest
+public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
{
public static final int NUM_SHARDS = 10;
public static final int OWNED_SHARD = 2;
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHomeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
{
int bufferSize = 8;
- SimpleChatHome simpleChatHome = new SimpleChatHome(
+ SimpleChatHomeService simpleChatHome = new SimpleChatHomeService(
getStorageStrategy(),
clock,
bufferSize);
@Override
- public ChatHome getChatHome()
+ public ChatHomeService getChatHome()
{
return simpleChatHome;
}
@Slf4j
public abstract class AbstractStorageStrategyIT
{
- protected ChatHome chathome;
+ protected ChatHomeService chathome;
protected abstract StorageStrategy getStorageStrategy();
interface StorageStrategyITConfig
{
- ChatHome getChatHome();
+ ChatHomeService getChatHome();
}
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
import java.time.Clock;
import java.util.stream.IntStream;
-public class ShardedChatHomeTest extends ChatHomeWithShardsTest
+public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
{
@TestConfiguration
static class Configuration
{
@Bean
- ShardedChatHome chatHome(
+ ShardedChatHomeService chatHome(
StorageStrategy storageStrategy,
Clock clock)
{
- SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
+ SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS];
IntStream
.of(ownedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
shard,
storageStrategy,
clock,
ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
- return new ShardedChatHome(chatHomes, strategy);
+ return new ShardedChatHomeService(chatHomes, strategy);
}
@Bean
package de.juplo.kafka.chat.backend.persistence.inmemory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
import org.springframework.boot.test.context.TestConfiguration;
import java.time.Clock;
-public class SimpleChatHomeTest extends ChatHomeTest
+public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
{
@TestConfiguration
static class Configuration
{
@Bean
- SimpleChatHome chatHome(
+ SimpleChatHomeService chatHome(
StorageStrategy storageStrategy,
Clock clock)
{
- return new SimpleChatHome(
+ return new SimpleChatHomeService(
storageStrategy,
clock,
bufferSize());
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC;
@SpringBootTest(
classes = {
- KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
+ KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class,
KafkaServicesConfiguration.class,
KafkaAutoConfiguration.class,
TaskExecutionAutoConfiguration.class,
})
@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
@Slf4j
-public class KafkaChatHomeTest extends ChatHomeWithShardsTest
+public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
{
final static String TOPIC = "KAFKA_CHAT_HOME_TEST";