feat: Replaced `EventSource` by @microsoft/fetch-event-source
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 20:15:31 +0000 (21:15 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 17:35:23 +0000 (18:35 +0100)
- EventSource lacks the posibility to set a request-header.
- The library is a replacement for the browser-side, that uses the Fetch API.
- https://www.npmjs.com/package/@microsoft/fetch-event-source

package-lock.json
package.json
src/app/chatroom.service.ts

index dd7c54d..fc4587b 100644 (file)
@@ -16,6 +16,7 @@
         "@angular/platform-browser": "^15.0.0",
         "@angular/platform-browser-dynamic": "^15.0.0",
         "@angular/router": "^15.0.0",
+        "@microsoft/fetch-event-source": "^2.0.1",
         "rxjs": "~7.5.0",
         "tslib": "^2.3.0",
         "zone.js": "~0.12.0"
       "integrity": "sha512-Hcv+nVC0kZnQ3tD9GVu5xSMR4VVYOteQIr/hwFPVEvPdlXqgGEuRjiheChHgdM+JyqdgNcmzZOX/tnl0JOiI7A==",
       "dev": true
     },
+    "node_modules/@microsoft/fetch-event-source": {
+      "version": "2.0.1",
+      "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
+      "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
+    },
     "node_modules/@ngtools/webpack": {
       "version": "15.0.4",
       "resolved": "https://registry.npmjs.org/@ngtools/webpack/-/webpack-15.0.4.tgz",
index ff54978..c6e26c0 100644 (file)
@@ -18,6 +18,7 @@
     "@angular/platform-browser": "^15.0.0",
     "@angular/platform-browser-dynamic": "^15.0.0",
     "@angular/router": "^15.0.0",
+    "@microsoft/fetch-event-source": "^2.0.1",
     "rxjs": "~7.5.0",
     "tslib": "^2.3.0",
     "zone.js": "~0.12.0"
index 1099c5e..406cf94 100644 (file)
@@ -1,10 +1,13 @@
 import { Injectable } from '@angular/core';
 import { HttpClient } from '@angular/common/http';
+import { EventSourceMessage, fetchEventSource } from '@microsoft/fetch-event-source';
 import { Observable, Subscriber } from 'rxjs';
 import { Chatroom } from './chatroom';
 import { Message } from './message';
 
-export const SSE_RECONNECT_UPPER_LIMIT = 64;
+class RetriableError extends Error { }
+class CanceledError extends Error { }
+class FatalError extends Error { }
 
 @Injectable({
   providedIn: 'root'
@@ -13,11 +16,9 @@ export class ChatroomService {
 
   private backendUri = 'http://localhost:8080/';
 
-  private channelUri: string = 'UNKNOWN';
-  private eventSource?: EventSource;
-  private reconnectTimeout?: NodeJS.Timeout;
-  private reconnectFrequencySec: number = 1;
   private channel: Subscriber<Message> = new Subscriber<Message>();
+  private uri: string = "CLOSED";
+  private canceled: boolean = false;
 
   constructor(private http: HttpClient) { }
 
@@ -30,85 +31,86 @@ export class ChatroomService {
   }
 
   listen(id: String): Observable<Message> {
-    this.channelUri = this.backendUri + id + '/listen';
-    this.openChannel();
-    let observable = new Observable<Message>((observer) => {
-      this.channel = observer;
-    });
-    return observable;
-  }
-
-  unlisten(): void {
-    this.closeChannel();
-    this.channelUri = 'UNKNOWN';
-    this.channel = new Subscriber<Message>();
-  }
-
-  // Creates SSE event source, handles SSE events
-  private openChannel(): void {
-
-    // Close event source if current instance of SSE service has some
-    if (this.eventSource) {
-      this.closeChannel();
-      this.eventSource = undefined;
+    let observable = new Observable<Message>(
+      (observer) => {
+        this.channel = observer;
+      });
+
+    if (this.uri !== 'CLOSED') {
+      console.log('Channel is still open, uncanceling ' + this.uri);
+      this.canceled = false;
+      return observable;
     }
 
-    console.log('Opening channel to ' + this.channelUri)
-
-    // Open new channel, create new EventSource
-    this.eventSource = new EventSource(this.channelUri);
-
-    // Process default event
-    this.eventSource.onmessage = (event: MessageEvent) => {
-      console.log('Received event ' + event.type + ' for ' + this.channelUri);
-      this.processEvent(event);
-    };
+    let uri: string = this.backendUri + id + '/listen';
+    let service = this;
+
+    fetchEventSource(uri, {
+      async onopen(response) {
+        if (response.ok && response.status === 200) {
+          console.log('Opend channel ' + uri, response);
+          service.uri = uri;
+          service.canceled = false;
+        }
+        else if (
+          response.status >= 400 &&
+          response.status < 500 &&
+          response.status !== 429
+        ) {
+          console.error('Client side error when connecting to channel ' + uri, response);
+          throw new FatalError();
+        }
+      },
+      onmessage(event) {
+        console.debug('Received message on channel: ' + uri);
+        if (service.canceled)
+          throw new CanceledError();
+        service.processEvent(event);
+      },
+      onclose() {
+        console.log('Server closed channel ' + uri);
+        service.uri = "CLOSED";
+        throw new RetriableError();
+      },
+      onerror(error) {
+        console.log('Error on channel ' + uri, error);
+        if (error instanceof CanceledError || error instanceof FatalError) {
+          service.uri = "CLOSED";
+          throw error; // rethrow to stop the operation
+        }
+        else {
+          return 1000; // retry-intervall in ms
+        }
+      },
+      openWhenHidden: true
+    }).then(() => console.debug('Promise fullfilled for ' + uri));
 
-    // Process connection opened
-    this.eventSource.onopen = () => {
-      console.log('Channel opened to ' + this.channelUri);
-      this.reconnectFrequencySec = 1;
-    };
+    return observable;
+  }
 
-    // Process error
-    this.eventSource.onerror = (error: Event) => {
-      console.log('Error on channel ' + this.channelUri + ": "+ error.type);
-      this.reconnectChannel();
-    };
+  unlisten(): void {
+    console.log('Canceling channel ' + this.uri);
+    this.canceled = true;
   }
 
   // Processes custom event types
-  private processEvent(messageEvent: MessageEvent): void {
-    const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {};
-    switch (messageEvent.type) {
+  private processEvent(message: EventSourceMessage): void {
+    const parsed = message.data ? JSON.parse(message.data) : {};
+    switch (message.event) {
       case 'message': {
         this.channel.next(parsed);
         break;
       }
+      case 'error': {
+        // Not implemented server-side yet
+        console.error('Received error-message from server on channel ' + this.uri, message.data);
+        throw new FatalError(message.data);
+      }
       // Add others if neccessary
       default: {
-        console.error('Unhandled event:', messageEvent.type);
+        console.error('Unhandled event:', message.event);
         break;
       }
     }
   }
-
-  private closeChannel(): void {
-    console.log('Closing channel to: ' + this.channelUri);
-    this.eventSource?.close();
-  }
-
-  // Handles reconnect attempts when the connection fails for some reason.
-  private reconnectChannel(): void {
-    const self = this;
-    this.closeChannel();
-    clearTimeout(this.reconnectTimeout);
-    this.reconnectTimeout = setTimeout(() => {
-      self.openChannel();
-      self.reconnectFrequencySec *= 2;
-      if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
-        self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
-      }
-    }, this.reconnectFrequencySec * 1000);
-  }
 }