import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
+import { EventSourceMessage, fetchEventSource } from '@microsoft/fetch-event-source';
import { Observable, Subscriber } from 'rxjs';
import { Chatroom } from './chatroom';
import { Message } from './message';
-export const SSE_RECONNECT_UPPER_LIMIT = 64;
+class RetriableError extends Error { }
+class CanceledError extends Error { }
+class FatalError extends Error { }
@Injectable({
providedIn: 'root'
private backendUri = 'http://localhost:8080/';
- private channelUri: string = 'UNKNOWN';
- private eventSource?: EventSource;
- private reconnectTimeout?: NodeJS.Timeout;
- private reconnectFrequencySec: number = 1;
private channel: Subscriber<Message> = new Subscriber<Message>();
+ private uri: string = "CLOSED";
+ private canceled: boolean = false;
constructor(private http: HttpClient) { }
}
listen(id: String): Observable<Message> {
- this.channelUri = this.backendUri + id + '/listen';
- this.openChannel();
- let observable = new Observable<Message>((observer) => {
- this.channel = observer;
- });
- return observable;
- }
-
- unlisten(): void {
- this.closeChannel();
- this.channelUri = 'UNKNOWN';
- this.channel = new Subscriber<Message>();
- }
-
- // Creates SSE event source, handles SSE events
- private openChannel(): void {
-
- // Close event source if current instance of SSE service has some
- if (this.eventSource) {
- this.closeChannel();
- this.eventSource = undefined;
+ let observable = new Observable<Message>(
+ (observer) => {
+ this.channel = observer;
+ });
+
+ if (this.uri !== 'CLOSED') {
+ console.log('Channel is still open, uncanceling ' + this.uri);
+ this.canceled = false;
+ return observable;
}
- console.log('Opening channel to ' + this.channelUri)
-
- // Open new channel, create new EventSource
- this.eventSource = new EventSource(this.channelUri);
-
- // Process default event
- this.eventSource.onmessage = (event: MessageEvent) => {
- console.log('Received event ' + event.type + ' for ' + this.channelUri);
- this.processEvent(event);
- };
+ let uri: string = this.backendUri + id + '/listen';
+ let service = this;
+
+ fetchEventSource(uri, {
+ async onopen(response) {
+ if (response.ok && response.status === 200) {
+ console.log('Opend channel ' + uri, response);
+ service.uri = uri;
+ service.canceled = false;
+ }
+ else if (
+ response.status >= 400 &&
+ response.status < 500 &&
+ response.status !== 429
+ ) {
+ console.error('Client side error when connecting to channel ' + uri, response);
+ throw new FatalError();
+ }
+ },
+ onmessage(event) {
+ console.debug('Received message on channel: ' + uri);
+ if (service.canceled)
+ throw new CanceledError();
+ service.processEvent(event);
+ },
+ onclose() {
+ console.log('Server closed channel ' + uri);
+ service.uri = "CLOSED";
+ throw new RetriableError();
+ },
+ onerror(error) {
+ console.log('Error on channel ' + uri, error);
+ if (error instanceof CanceledError || error instanceof FatalError) {
+ service.uri = "CLOSED";
+ throw error; // rethrow to stop the operation
+ }
+ else {
+ return 1000; // retry-intervall in ms
+ }
+ },
+ openWhenHidden: true
+ }).then(() => console.debug('Promise fullfilled for ' + uri));
- // Process connection opened
- this.eventSource.onopen = () => {
- console.log('Channel opened to ' + this.channelUri);
- this.reconnectFrequencySec = 1;
- };
+ return observable;
+ }
- // Process error
- this.eventSource.onerror = (error: Event) => {
- console.log('Error on channel ' + this.channelUri + ": "+ error.type);
- this.reconnectChannel();
- };
+ unlisten(): void {
+ console.log('Canceling channel ' + this.uri);
+ this.canceled = true;
}
// Processes custom event types
- private processEvent(messageEvent: MessageEvent): void {
- const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {};
- switch (messageEvent.type) {
+ private processEvent(message: EventSourceMessage): void {
+ const parsed = message.data ? JSON.parse(message.data) : {};
+ switch (message.event) {
case 'message': {
this.channel.next(parsed);
break;
}
+ case 'error': {
+ // Not implemented server-side yet
+ console.error('Received error-message from server on channel ' + this.uri, message.data);
+ throw new FatalError(message.data);
+ }
// Add others if neccessary
default: {
- console.error('Unhandled event:', messageEvent.type);
+ console.error('Unhandled event:', message.event);
break;
}
}
}
-
- private closeChannel(): void {
- console.log('Closing channel to: ' + this.channelUri);
- this.eventSource?.close();
- }
-
- // Handles reconnect attempts when the connection fails for some reason.
- private reconnectChannel(): void {
- const self = this;
- this.closeChannel();
- clearTimeout(this.reconnectTimeout);
- this.reconnectTimeout = setTimeout(() => {
- self.openChannel();
- self.reconnectFrequencySec *= 2;
- if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
- self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
- }
- }, this.reconnectFrequencySec * 1000);
- }
}