--- /dev/null
+package de.juplo.kafka.chatroom;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+import java.time.Clock;
+
+
+@SpringBootApplication
+public class ChatroomApplication
+{
+ @Bean
+ public Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(ChatroomApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chatroom.api;
+
+import de.juplo.kafka.chatroom.domain.Chatroom;
+import lombok.RequiredArgsConstructor;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.time.LocalDateTime;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RestController
+@RequiredArgsConstructor
+public class ChatroomController
+{
+ private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
+ private final Clock clock;
+
+
+ @PostMapping("create")
+ public Chatroom create(@RequestBody String name)
+ {
+ Chatroom chatroom = new Chatroom(UUID.randomUUID(), name);
+ chatrooms.put(chatroom.getId(), chatroom);
+ return chatroom;
+ }
+
+ @GetMapping("list")
+ public Collection<Chatroom> list()
+ {
+ return chatrooms.values();
+ }
+
+ @GetMapping("get/{chatroomId}")
+ public Chatroom get(@PathVariable UUID chatroomId)
+ {
+ return chatrooms.get(chatroomId);
+ }
+
+ @PutMapping("put/{chatroomId}/{username}/{messageId}")
+ public Mono<MessageTo> put(
+ @PathVariable UUID chatroomId,
+ @PathVariable String username,
+ @PathVariable Long messageId,
+ @RequestBody String text)
+ {
+ Chatroom chatroom = chatrooms.get(chatroomId);
+ return
+ chatroom
+ .addMessage(
+ messageId,
+ LocalDateTime.now(clock),
+ username,
+ text)
+ .switchIfEmpty(chatroom.getMessage(username, messageId))
+ .map(message -> MessageTo.from(message));
+ }
+
+ @GetMapping("get/{chatroomId}/{username}/{messageId}")
+ public Mono<MessageTo> get(
+ @PathVariable UUID chatroomId,
+ @PathVariable String username,
+ @PathVariable Long messageId)
+ {
+ return
+ chatrooms
+ .get(chatroomId)
+ .getMessage(username, messageId)
+ .map(message -> MessageTo.from(message));
+ }
+
+ @GetMapping(
+ path = "listen/{chatroomId}",
+ produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
+ {
+ return chatrooms
+ .get(chatroomId)
+ .listen()
+ .log()
+ .map(message -> MessageTo.from(message));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chatroom.api;
+
+import de.juplo.kafka.chatroom.domain.MessageMutationException;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ProblemDetail;
+import org.springframework.web.bind.annotation.ControllerAdvice;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.server.ServerWebExchange;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.util.Date;
+
+
+@ControllerAdvice
+public class ChatroomControllerAdvice
+{
+ @Value("${server.context-path:/}")
+ String contextPath;
+
+ @ExceptionHandler(MessageMutationException.class)
+ public final ProblemDetail handleException(
+ MessageMutationException e,
+ ServerWebExchange exchange,
+ UriComponentsBuilder uriComponentsBuilder)
+ {
+ final HttpStatus status = HttpStatus.BAD_REQUEST;
+ ProblemDetail problem = ProblemDetail.forStatus(status);
+
+ problem.setProperty("timestamp", new Date());
+
+ problem.setProperty("requestId", exchange.getRequest().getId());
+
+ problem.setType(uriComponentsBuilder.replacePath(contextPath).path("/problem/message-mutation").build().toUri());
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(status.getReasonPhrase());
+ stringBuilder.append(" - ");
+ stringBuilder.append(e.getMessage());
+ problem.setTitle(stringBuilder.toString());
+
+ stringBuilder.setLength(0);
+ stringBuilder.append("The existing message with user=");
+ stringBuilder.append(e.getExisting().getUser());
+ stringBuilder.append(" and id=");
+ stringBuilder.append(e.getExisting().getId());
+ stringBuilder.append(" cannot be mutated!");
+ problem.setDetail(stringBuilder.toString());
+
+ problem.setProperty("mutatedMessage", e.getMutated());
+
+ problem.setProperty("existingMessage", e.getExisting());
+
+ return problem;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chatroom.api;
+
+import de.juplo.kafka.chatroom.domain.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+@AllArgsConstructor
+public class MessageTo
+{
+ private Long id;
+ private Long serialNumber;
+ private LocalDateTime timestamp;
+ private String user;
+ private String text;
+
+ public static MessageTo from(Message message)
+ {
+ return
+ new MessageTo(
+ message.getId(),
+ message.getSerialNumber(),
+ message.getTimestamp(),
+ message.getUser(),
+ message.getText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chatroom.domain;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.stream.Stream;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class Chatroom
+{
+ @Getter
+ private final UUID id;
+ @Getter
+ private final String name;
+ private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
+ private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
+
+ synchronized public Mono<Message> addMessage(
+ Long id,
+ LocalDateTime timestamp,
+ String user,
+ String text)
+ {
+ return persistMessage(id, timestamp, user, text)
+ .doOnNext(message -> sink.tryEmitNext(message).orThrow());
+ }
+
+ private Mono<Message> persistMessage(
+ Long id,
+ LocalDateTime timestamp,
+ String user,
+ String text)
+ {
+ Message message = new Message(id, (long)messages.size(), timestamp, user, text);
+
+ MessageKey key = new MessageKey(user, id);
+ Message existing = messages.get(key);
+ if (existing != null)
+ {
+ log.info("Message with key {} already exists; {}", key, existing);
+ if (!message.equals(existing))
+ throw new MessageMutationException(message, existing);
+ return Mono.empty();
+ }
+
+ messages.put(key, message);
+ return Mono
+ .fromSupplier(() -> message)
+ .log();
+ }
+
+ public Mono<Message> getMessage(String username, Long messageId)
+ {
+ return Mono.fromSupplier(() ->
+ {
+ MessageKey key = MessageKey.of(username, messageId);
+ return messages.get(key);
+ });
+ }
+
+ public Flux<Message> listen()
+ {
+ return sink.asFlux();
+ }
+
+ public Stream<Message> getMessages(long firstMessage)
+ {
+ return messages
+ .values()
+ .stream()
+ .filter(message -> message.getSerialNumber() >= firstMessage);
+ }
+
+
+ @Value(staticConstructor = "of")
+ static class MessageKey
+ {
+ String username;
+ Long messageId;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chatroom.domain;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.time.LocalDateTime;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+@ToString
+public class Message
+{
+ private final Long id;
+ private final Long serialNumber;
+ private final LocalDateTime timestamp;
+ private final String user;
+ private final String text;
+}
--- /dev/null
+package de.juplo.kafka.chatroom.domain;
+
+import lombok.Getter;
+
+
+public class MessageMutationException extends RuntimeException
+{
+ @Getter
+ private final Message mutated;
+ @Getter
+ private final Message existing;
+
+ public MessageMutationException(Message mutated, Message existing)
+ {
+ super("Messages are imutable!");
+ this.mutated = mutated;
+ this.existing = existing;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chatroom;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-import java.time.Clock;
-
-
-@SpringBootApplication
-public class ChatroomApplication
-{
- @Bean
- public Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
-
- public static void main(String[] args)
- {
- SpringApplication.run(ChatroomApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.api;
-
-import de.juplo.kafka.chatroom.domain.Chatroom;
-import lombok.RequiredArgsConstructor;
-import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.*;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.time.LocalDateTime;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@RestController
-@RequiredArgsConstructor
-public class ChatroomController
-{
- private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
- private final Clock clock;
-
-
- @PostMapping("create")
- public Chatroom create(@RequestBody String name)
- {
- Chatroom chatroom = new Chatroom(UUID.randomUUID(), name);
- chatrooms.put(chatroom.getId(), chatroom);
- return chatroom;
- }
-
- @GetMapping("list")
- public Collection<Chatroom> list()
- {
- return chatrooms.values();
- }
-
- @GetMapping("get/{chatroomId}")
- public Chatroom get(@PathVariable UUID chatroomId)
- {
- return chatrooms.get(chatroomId);
- }
-
- @PutMapping("put/{chatroomId}/{username}/{messageId}")
- public Mono<MessageTo> put(
- @PathVariable UUID chatroomId,
- @PathVariable String username,
- @PathVariable Long messageId,
- @RequestBody String text)
- {
- Chatroom chatroom = chatrooms.get(chatroomId);
- return
- chatroom
- .addMessage(
- messageId,
- LocalDateTime.now(clock),
- username,
- text)
- .switchIfEmpty(chatroom.getMessage(username, messageId))
- .map(message -> MessageTo.from(message));
- }
-
- @GetMapping("get/{chatroomId}/{username}/{messageId}")
- public Mono<MessageTo> get(
- @PathVariable UUID chatroomId,
- @PathVariable String username,
- @PathVariable Long messageId)
- {
- return
- chatrooms
- .get(chatroomId)
- .getMessage(username, messageId)
- .map(message -> MessageTo.from(message));
- }
-
- @GetMapping(
- path = "listen/{chatroomId}",
- produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public Flux<MessageTo> listen(@PathVariable UUID chatroomId)
- {
- return chatrooms
- .get(chatroomId)
- .listen()
- .log()
- .map(message -> MessageTo.from(message));
- }
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.api;
-
-import de.juplo.kafka.chatroom.domain.MessageMutationException;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ProblemDetail;
-import org.springframework.web.bind.annotation.ControllerAdvice;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.server.ServerWebExchange;
-import org.springframework.web.util.UriComponentsBuilder;
-
-import java.util.Date;
-
-
-@ControllerAdvice
-public class ChatroomControllerAdvice
-{
- @Value("${server.context-path:/}")
- String contextPath;
-
- @ExceptionHandler(MessageMutationException.class)
- public final ProblemDetail handleException(
- MessageMutationException e,
- ServerWebExchange exchange,
- UriComponentsBuilder uriComponentsBuilder)
- {
- final HttpStatus status = HttpStatus.BAD_REQUEST;
- ProblemDetail problem = ProblemDetail.forStatus(status);
-
- problem.setProperty("timestamp", new Date());
-
- problem.setProperty("requestId", exchange.getRequest().getId());
-
- problem.setType(uriComponentsBuilder.replacePath(contextPath).path("/problem/message-mutation").build().toUri());
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(status.getReasonPhrase());
- stringBuilder.append(" - ");
- stringBuilder.append(e.getMessage());
- problem.setTitle(stringBuilder.toString());
-
- stringBuilder.setLength(0);
- stringBuilder.append("The existing message with user=");
- stringBuilder.append(e.getExisting().getUser());
- stringBuilder.append(" and id=");
- stringBuilder.append(e.getExisting().getId());
- stringBuilder.append(" cannot be mutated!");
- problem.setDetail(stringBuilder.toString());
-
- problem.setProperty("mutatedMessage", e.getMutated());
-
- problem.setProperty("existingMessage", e.getExisting());
-
- return problem;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.api;
-
-import de.juplo.kafka.chatroom.domain.Message;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.time.LocalDateTime;
-
-
-@Data
-@AllArgsConstructor
-public class MessageTo
-{
- private Long id;
- private Long serialNumber;
- private LocalDateTime timestamp;
- private String user;
- private String text;
-
- public static MessageTo from(Message message)
- {
- return
- new MessageTo(
- message.getId(),
- message.getSerialNumber(),
- message.getTimestamp(),
- message.getUser(),
- message.getText());
- }
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.domain;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-
-import java.time.LocalDateTime;
-import java.util.*;
-import java.util.stream.Stream;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class Chatroom
-{
- @Getter
- private final UUID id;
- @Getter
- private final String name;
- private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
- private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
-
- synchronized public Mono<Message> addMessage(
- Long id,
- LocalDateTime timestamp,
- String user,
- String text)
- {
- return persistMessage(id, timestamp, user, text)
- .doOnNext(message -> sink.tryEmitNext(message).orThrow());
- }
-
- private Mono<Message> persistMessage(
- Long id,
- LocalDateTime timestamp,
- String user,
- String text)
- {
- Message message = new Message(id, (long)messages.size(), timestamp, user, text);
-
- MessageKey key = new MessageKey(user, id);
- Message existing = messages.get(key);
- if (existing != null)
- {
- log.info("Message with key {} already exists; {}", key, existing);
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return Mono.empty();
- }
-
- messages.put(key, message);
- return Mono
- .fromSupplier(() -> message)
- .log();
- }
-
- public Mono<Message> getMessage(String username, Long messageId)
- {
- return Mono.fromSupplier(() ->
- {
- MessageKey key = MessageKey.of(username, messageId);
- return messages.get(key);
- });
- }
-
- public Flux<Message> listen()
- {
- return sink.asFlux();
- }
-
- public Stream<Message> getMessages(long firstMessage)
- {
- return messages
- .values()
- .stream()
- .filter(message -> message.getSerialNumber() >= firstMessage);
- }
-
-
- @Value(staticConstructor = "of")
- static class MessageKey
- {
- String username;
- Long messageId;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.domain;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-import java.time.LocalDateTime;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode
-@ToString
-public class Message
-{
- private final Long id;
- private final Long serialNumber;
- private final LocalDateTime timestamp;
- private final String user;
- private final String text;
-}
+++ /dev/null
-package de.juplo.kafka.chatroom.domain;
-
-import lombok.Getter;
-
-
-public class MessageMutationException extends RuntimeException
-{
- @Getter
- private final Message mutated;
- @Getter
- private final Message existing;
-
- public MessageMutationException(Message mutated, Message existing)
- {
- super("Messages are imutable!");
- this.mutated = mutated;
- this.existing = existing;
- }
-}
--- /dev/null
+package de.juplo.kafka.chatroom;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ChatroomApplicationTests
+{
+ @Test
+ void contextLoads()
+ {
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chatroom;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class ChatroomApplicationTests
-{
- @Test
- void contextLoads()
- {
- }
-}