feat: Implemented automatic reconnection in chatroom-service
authorKai Moritz <kai@juplo.de>
Sat, 7 Jan 2023 17:16:06 +0000 (18:16 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 7 Jan 2023 18:06:42 +0000 (19:06 +0100)
src/app/chatroom.service.ts

index f7958fa..11ad225 100644 (file)
@@ -1,8 +1,10 @@
 import { Injectable } from '@angular/core';
-import { HttpClient } from "@angular/common/http";
-import { Observable } from "rxjs";
-import { Chatroom } from "./chatroom";
-import { Message } from "./message";
+import { HttpClient } from '@angular/common/http';
+import { Observable, Subscriber } from 'rxjs';
+import { Chatroom } from './chatroom';
+import { Message } from './message';
+
+export const SSE_RECONNECT_UPPER_LIMIT = 64;
 
 @Injectable({
   providedIn: 'root'
@@ -13,6 +15,12 @@ export class ChatroomService {
   private uriGet = 'http://localhost:8080/get/';
   private uriListen = 'http://localhost:8080/listen/';
 
+  private channelUri: string = 'UNKNOWN';
+  private eventSource?: EventSource;
+  private reconnectTimeout?: NodeJS.Timeout;
+  private reconnectFrequencySec: number = 1;
+  private channel: Subscriber<Message> = new Subscriber<Message>();
+
   constructor(private http: HttpClient) { }
 
   getChatrooms(): Observable<Chatroom[]> {
@@ -24,27 +32,79 @@ export class ChatroomService {
   }
 
   listen(id: String): Observable<Message> {
-    return new Observable<Message>((observer) => {
-      let url = this.uriListen + id;
-      let eventSource = new EventSource(url);
-      eventSource.onmessage = (event) => {
-        console.debug('Received event: ', event);
-        let message: Message = JSON.parse(event.data);
-        observer.next(message);
-      };
-      eventSource.onerror = (error) => {
-        // readyState === 0 (closed) means the remote source closed the connection,
-        // so we can safely treat it as a normal situation. Another way
-        // of detecting the end of the stream is to insert a special element
-        // in the stream of events, which the client can identify as the last one.
-        if(eventSource.readyState === 0) {
-          console.log('The stream has been closed by the server.');
-          eventSource.close();
-          observer.complete();
-        } else {
-          observer.error('EventSource error: ' + error);
-        }
-      }
+    this.channelUri = this.uriListen + id;
+    this.openChannel();
+    let observable = new Observable<Message>((observer) => {
+      this.channel = observer;
     });
+    return observable;
+  }
+
+  // Creates SSE event source, handles SSE events
+  private openChannel(): void {
+
+    // Close event source if current instance of SSE service has some
+    if (this.eventSource) {
+      this.closeChannel();
+      this.eventSource = undefined;
+    }
+
+    console.log('Opening channel to ' + this.channelUri)
+
+    // Open new channel, create new EventSource
+    this.eventSource = new EventSource(this.channelUri);
+
+    // Process default event
+    this.eventSource.onmessage = (event: MessageEvent) => {
+      console.log('Received event ' + event.type + ' for ' + this.channelUri);
+      this.processEvent(event);
+    };
+
+    // Process connection opened
+    this.eventSource.onopen = () => {
+      console.log('Channel opened to ' + this.channelUri);
+      this.reconnectFrequencySec = 1;
+    };
+
+    // Process error
+    this.eventSource.onerror = (error: Event) => {
+      console.log('Error on channel ' + this.channelUri + ": "+ error.type);
+      this.reconnectChannel();
+    };
+  }
+
+  // Processes custom event types
+  private processEvent(messageEvent: MessageEvent): void {
+    const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {};
+    switch (messageEvent.type) {
+      case 'message': {
+        this.channel.next(parsed);
+        break;
+      }
+      // Add others if neccessary
+      default: {
+        console.error('Unhandled event:', messageEvent.type);
+        break;
+      }
+    }
+  }
+
+  private closeChannel(): void {
+    console.log('Closing channel to: ' + this.channelUri);
+    this.eventSource?.close();
+  }
+
+  // Handles reconnect attempts when the connection fails for some reason.
+  private reconnectChannel(): void {
+    const self = this;
+    this.closeChannel();
+    clearTimeout(this.reconnectTimeout);
+    this.reconnectTimeout = setTimeout(() => {
+      self.openChannel();
+      self.reconnectFrequencySec *= 2;
+      if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
+        self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
+      }
+    }, this.reconnectFrequencySec * 1000);
   }
 }