- Renamed and moved `ChatroomInfo` to `ChatroomTo`
- Switched return-types from `Chatroom` to `ChatroomTo`
@PreDestroy
public void onExit()
{
@PreDestroy
public void onExit()
{
- storageStrategy.writeChatrooms(Flux.fromIterable(chatHome.list()));
+ storageStrategy.writeChatrooms(Flux.fromStream(chatHome.list()));
}
public static void main(String[] args)
}
public static void main(String[] args)
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Stream;
- public Chatroom create(@RequestBody String name)
+ public ChatroomTo create(@RequestBody String name)
- return chatHome.createChatroom(name);
+ return ChatroomTo.from(chatHome.createChatroom(name));
- public Collection<Chatroom> list()
+ public Stream<ChatroomTo> list()
- return chatHome.list();
+ return chatHome.list().map(chatroom -> ChatroomTo.from(chatroom));
}
@GetMapping("get/{chatroomId}")
}
@GetMapping("get/{chatroomId}")
- public Optional<Chatroom> get(@PathVariable UUID chatroomId)
+ public Optional<ChatroomTo> get(@PathVariable UUID chatroomId)
- return chatHome.getChatroom(chatroomId);
+ return chatHome.getChatroom(chatroomId).map(chatroom -> ChatroomTo.from(chatroom));
}
@PutMapping("put/{chatroomId}/{username}/{messageId}")
}
@PutMapping("put/{chatroomId}/{username}/{messageId}")
--- /dev/null
+package de.juplo.kafka.chat.backend.api;
+
+import de.juplo.kafka.chat.backend.domain.Chatroom;
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class ChatroomTo
+{
+ private UUID id;
+ private String name;
+
+
+ public static ChatroomTo from(Chatroom chatroom)
+ {
+ ChatroomTo info = new ChatroomTo();
+ info.id = chatroom.getId();
+ info.name = chatroom.getName();
+ return info;
+ }
+}
import lombok.RequiredArgsConstructor;
import java.util.*;
import lombok.RequiredArgsConstructor;
import java.util.*;
+import java.util.stream.Stream;
return Optional.ofNullable(chatrooms.get(id));
}
return Optional.ofNullable(chatrooms.get(id));
}
- public Collection<Chatroom> list()
+ public Stream<Chatroom> list()
- return chatrooms.values();
+ return chatrooms.values().stream();
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.Chatroom;
-import lombok.Data;
-
-import java.util.UUID;
-
-@Data
-public class ChatroomInfo
-{
- private UUID id;
- private String name;
-
-
- public static ChatroomInfo from(Chatroom chatroom)
- {
- ChatroomInfo info = new ChatroomInfo();
- info.id = chatroom.getId();
- info.name = chatroom.getName();
- return info;
- }
-}
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatroomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.function.Function;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.function.Function;
-import java.util.stream.Collector;
import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
- ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom);
- generator.writeObject(chatroomInfo);
- writeMessages(chatroomInfo, chatroom.getMessages());
+ ChatroomTo chatroomTo = ChatroomTo.from(chatroom);
+ generator.writeObject(chatroomTo);
+ writeMessages(chatroomTo, chatroom.getMessages());
}
catch (IOException e)
{
}
catch (IOException e)
{
if (parser.nextToken() != JsonToken.START_ARRAY)
throw new IllegalStateException("Expected content to be an array");
if (parser.nextToken() != JsonToken.START_ARRAY)
throw new IllegalStateException("Expected content to be an array");
- Sinks.Many<ChatroomInfo> many = Sinks.many().unicast().onBackpressureBuffer();
+ Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
while (parser.nextToken() != JsonToken.END_ARRAY)
{
many
while (parser.nextToken() != JsonToken.END_ARRAY)
{
many
- .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class))
+ .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
{
LinkedHashMap<Message.MessageKey, Message> messages =
{
LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomInfo)
+ readMessages(chatroomTo)
.collect(Collectors.toMap(
Message::getKey,
Function.identity(),
.collect(Collectors.toMap(
Message::getKey,
Function.identity(),
LinkedHashMap::new))
.block();
InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
LinkedHashMap::new))
.block();
InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy);
+ return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
});
}
catch (NoSuchFileException e)
});
}
catch (NoSuchFileException e)
- public void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux)
+ public void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux)
- Path path = chatroomPath(chatroomInfo);
- log.info("Writing messages for {} to {}", chatroomInfo, path);
+ Path path = chatroomPath(chatroomTo);
+ log.info("Writing messages for {} to {}", chatroomTo, path);
try
{
Files.createDirectories(storagePath);
try
{
Files.createDirectories(storagePath);
- public Flux<Message> readMessages(ChatroomInfo chatroomInfo)
+ public Flux<Message> readMessages(ChatroomTo chatroomTo)
- Path path = chatroomPath(chatroomInfo);
- log.info("Reading messages for {} from {}", chatroomInfo, path);
+ Path path = chatroomPath(chatroomTo);
+ log.info("Reading messages for {} from {}", chatroomTo, path);
try
{
JsonParser parser =
try
{
JsonParser parser =
log.info(
"{} does not exist - starting with empty chat for {}",
path,
log.info(
"{} does not exist - starting with empty chat for {}",
path,
return Flux.empty();
}
catch (IOException e)
return Flux.empty();
}
catch (IOException e)
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatroomInfo chatroomInfo)
+ Path chatroomPath(ChatroomTo chatroomTo)
- return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
package de.juplo.kafka.chat.backend.persistence;
package de.juplo.kafka.chat.backend.persistence;
+import de.juplo.kafka.chat.backend.api.ChatroomTo;
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
{
void writeChatrooms(Flux<Chatroom> chatroomFlux);
Flux<Chatroom> readChatrooms();
{
void writeChatrooms(Flux<Chatroom> chatroomFlux);
Flux<Chatroom> readChatrooms();
- void writeMessages(ChatroomInfo chatroomInfo, Flux<Message> messageFlux);
- Flux<Message> readMessages(ChatroomInfo chatroomInfo);
+ void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux);
+ Flux<Message> readMessages(ChatroomTo chatroomTo);