From: Kai Moritz Date: Sat, 14 Jan 2023 20:15:31 +0000 (+0100) Subject: feat: Replaced `EventSource` by @microsoft/fetch-event-source X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b8277f680de3a25efa3b4ad9f8c6ed361ebcaec3;p=demos%2Fkafka%2Fchat feat: Replaced `EventSource` by @microsoft/fetch-event-source - EventSource lacks the posibility to set a request-header. - The library is a replacement for the browser-side, that uses the Fetch API. - https://www.npmjs.com/package/@microsoft/fetch-event-source --- diff --git a/package-lock.json b/package-lock.json index dd7c54d3..fc4587ba 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@angular/platform-browser": "^15.0.0", "@angular/platform-browser-dynamic": "^15.0.0", "@angular/router": "^15.0.0", + "@microsoft/fetch-event-source": "^2.0.1", "rxjs": "~7.5.0", "tslib": "^2.3.0", "zone.js": "~0.12.0" @@ -2434,6 +2435,11 @@ "integrity": "sha512-Hcv+nVC0kZnQ3tD9GVu5xSMR4VVYOteQIr/hwFPVEvPdlXqgGEuRjiheChHgdM+JyqdgNcmzZOX/tnl0JOiI7A==", "dev": true }, + "node_modules/@microsoft/fetch-event-source": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz", + "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==" + }, "node_modules/@ngtools/webpack": { "version": "15.0.4", "resolved": "https://registry.npmjs.org/@ngtools/webpack/-/webpack-15.0.4.tgz", diff --git a/package.json b/package.json index ff549782..c6e26c05 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "@angular/platform-browser": "^15.0.0", "@angular/platform-browser-dynamic": "^15.0.0", "@angular/router": "^15.0.0", + "@microsoft/fetch-event-source": "^2.0.1", "rxjs": "~7.5.0", "tslib": "^2.3.0", "zone.js": "~0.12.0" diff --git a/src/app/chatroom.service.ts b/src/app/chatroom.service.ts index 1099c5e2..406cf942 100644 --- a/src/app/chatroom.service.ts +++ b/src/app/chatroom.service.ts @@ -1,10 +1,13 @@ import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; +import { EventSourceMessage, fetchEventSource } from '@microsoft/fetch-event-source'; import { Observable, Subscriber } from 'rxjs'; import { Chatroom } from './chatroom'; import { Message } from './message'; -export const SSE_RECONNECT_UPPER_LIMIT = 64; +class RetriableError extends Error { } +class CanceledError extends Error { } +class FatalError extends Error { } @Injectable({ providedIn: 'root' @@ -13,11 +16,9 @@ export class ChatroomService { private backendUri = 'http://localhost:8080/'; - private channelUri: string = 'UNKNOWN'; - private eventSource?: EventSource; - private reconnectTimeout?: NodeJS.Timeout; - private reconnectFrequencySec: number = 1; private channel: Subscriber = new Subscriber(); + private uri: string = "CLOSED"; + private canceled: boolean = false; constructor(private http: HttpClient) { } @@ -30,85 +31,86 @@ export class ChatroomService { } listen(id: String): Observable { - this.channelUri = this.backendUri + id + '/listen'; - this.openChannel(); - let observable = new Observable((observer) => { - this.channel = observer; - }); - return observable; - } - - unlisten(): void { - this.closeChannel(); - this.channelUri = 'UNKNOWN'; - this.channel = new Subscriber(); - } - - // Creates SSE event source, handles SSE events - private openChannel(): void { - - // Close event source if current instance of SSE service has some - if (this.eventSource) { - this.closeChannel(); - this.eventSource = undefined; + let observable = new Observable( + (observer) => { + this.channel = observer; + }); + + if (this.uri !== 'CLOSED') { + console.log('Channel is still open, uncanceling ' + this.uri); + this.canceled = false; + return observable; } - console.log('Opening channel to ' + this.channelUri) - - // Open new channel, create new EventSource - this.eventSource = new EventSource(this.channelUri); - - // Process default event - this.eventSource.onmessage = (event: MessageEvent) => { - console.log('Received event ' + event.type + ' for ' + this.channelUri); - this.processEvent(event); - }; + let uri: string = this.backendUri + id + '/listen'; + let service = this; + + fetchEventSource(uri, { + async onopen(response) { + if (response.ok && response.status === 200) { + console.log('Opend channel ' + uri, response); + service.uri = uri; + service.canceled = false; + } + else if ( + response.status >= 400 && + response.status < 500 && + response.status !== 429 + ) { + console.error('Client side error when connecting to channel ' + uri, response); + throw new FatalError(); + } + }, + onmessage(event) { + console.debug('Received message on channel: ' + uri); + if (service.canceled) + throw new CanceledError(); + service.processEvent(event); + }, + onclose() { + console.log('Server closed channel ' + uri); + service.uri = "CLOSED"; + throw new RetriableError(); + }, + onerror(error) { + console.log('Error on channel ' + uri, error); + if (error instanceof CanceledError || error instanceof FatalError) { + service.uri = "CLOSED"; + throw error; // rethrow to stop the operation + } + else { + return 1000; // retry-intervall in ms + } + }, + openWhenHidden: true + }).then(() => console.debug('Promise fullfilled for ' + uri)); - // Process connection opened - this.eventSource.onopen = () => { - console.log('Channel opened to ' + this.channelUri); - this.reconnectFrequencySec = 1; - }; + return observable; + } - // Process error - this.eventSource.onerror = (error: Event) => { - console.log('Error on channel ' + this.channelUri + ": "+ error.type); - this.reconnectChannel(); - }; + unlisten(): void { + console.log('Canceling channel ' + this.uri); + this.canceled = true; } // Processes custom event types - private processEvent(messageEvent: MessageEvent): void { - const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {}; - switch (messageEvent.type) { + private processEvent(message: EventSourceMessage): void { + const parsed = message.data ? JSON.parse(message.data) : {}; + switch (message.event) { case 'message': { this.channel.next(parsed); break; } + case 'error': { + // Not implemented server-side yet + console.error('Received error-message from server on channel ' + this.uri, message.data); + throw new FatalError(message.data); + } // Add others if neccessary default: { - console.error('Unhandled event:', messageEvent.type); + console.error('Unhandled event:', message.event); break; } } } - - private closeChannel(): void { - console.log('Closing channel to: ' + this.channelUri); - this.eventSource?.close(); - } - - // Handles reconnect attempts when the connection fails for some reason. - private reconnectChannel(): void { - const self = this; - this.closeChannel(); - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = setTimeout(() => { - self.openChannel(); - self.reconnectFrequencySec *= 2; - if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) { - self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT; - } - }, this.reconnectFrequencySec * 1000); - } }