import { Injectable } from '@angular/core';
-import { HttpClient } from "@angular/common/http";
-import { Observable } from "rxjs";
-import { Chatroom } from "./chatroom";
-import { Message } from "./message";
+import { HttpClient } from '@angular/common/http';
+import { Observable, Subscriber } from 'rxjs';
+import { Chatroom } from './chatroom';
+import { Message } from './message';
+
+export const SSE_RECONNECT_UPPER_LIMIT = 64;
@Injectable({
providedIn: 'root'
private uriGet = 'http://localhost:8080/get/';
private uriListen = 'http://localhost:8080/listen/';
+ private channelUri: string = 'UNKNOWN';
+ private eventSource?: EventSource;
+ private reconnectTimeout?: NodeJS.Timeout;
+ private reconnectFrequencySec: number = 1;
+ private channel: Subscriber<Message> = new Subscriber<Message>();
+
constructor(private http: HttpClient) { }
getChatrooms(): Observable<Chatroom[]> {
}
listen(id: String): Observable<Message> {
- return new Observable<Message>((observer) => {
- let url = this.uriListen + id;
- let eventSource = new EventSource(url);
- eventSource.onmessage = (event) => {
- console.debug('Received event: ', event);
- let message: Message = JSON.parse(event.data);
- observer.next(message);
- };
- eventSource.onerror = (error) => {
- // readyState === 0 (closed) means the remote source closed the connection,
- // so we can safely treat it as a normal situation. Another way
- // of detecting the end of the stream is to insert a special element
- // in the stream of events, which the client can identify as the last one.
- if(eventSource.readyState === 0) {
- console.log('The stream has been closed by the server.');
- eventSource.close();
- observer.complete();
- } else {
- observer.error('EventSource error: ' + error);
- }
- }
+ this.channelUri = this.uriListen + id;
+ this.openChannel();
+ let observable = new Observable<Message>((observer) => {
+ this.channel = observer;
});
+ return observable;
+ }
+
+ // Creates SSE event source, handles SSE events
+ private openChannel(): void {
+
+ // Close event source if current instance of SSE service has some
+ if (this.eventSource) {
+ this.closeChannel();
+ this.eventSource = undefined;
+ }
+
+ console.log('Opening channel to ' + this.channelUri)
+
+ // Open new channel, create new EventSource
+ this.eventSource = new EventSource(this.channelUri);
+
+ // Process default event
+ this.eventSource.onmessage = (event: MessageEvent) => {
+ console.log('Received event ' + event.type + ' for ' + this.channelUri);
+ this.processEvent(event);
+ };
+
+ // Process connection opened
+ this.eventSource.onopen = () => {
+ console.log('Channel opened to ' + this.channelUri);
+ this.reconnectFrequencySec = 1;
+ };
+
+ // Process error
+ this.eventSource.onerror = (error: Event) => {
+ console.log('Error on channel ' + this.channelUri + ": "+ error.type);
+ this.reconnectChannel();
+ };
+ }
+
+ // Processes custom event types
+ private processEvent(messageEvent: MessageEvent): void {
+ const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {};
+ switch (messageEvent.type) {
+ case 'message': {
+ this.channel.next(parsed);
+ break;
+ }
+ // Add others if neccessary
+ default: {
+ console.error('Unhandled event:', messageEvent.type);
+ break;
+ }
+ }
+ }
+
+ private closeChannel(): void {
+ console.log('Closing channel to: ' + this.channelUri);
+ this.eventSource?.close();
+ }
+
+ // Handles reconnect attempts when the connection fails for some reason.
+ private reconnectChannel(): void {
+ const self = this;
+ this.closeChannel();
+ clearTimeout(this.reconnectTimeout);
+ this.reconnectTimeout = setTimeout(() => {
+ self.openChannel();
+ self.reconnectFrequencySec *= 2;
+ if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
+ self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
+ }
+ }, this.reconnectFrequencySec * 1000);
}
}