From: Kai Moritz Date: Sat, 7 Jan 2023 17:16:06 +0000 (+0100) Subject: feat: Implemented automatic reconnection in chatroom-service X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6b3f6ba2e5d198432a212131dfc391240b911e2b;p=demos%2Fkafka%2Fchat feat: Implemented automatic reconnection in chatroom-service --- diff --git a/src/app/chatroom.service.ts b/src/app/chatroom.service.ts index f7958fac..11ad225a 100644 --- a/src/app/chatroom.service.ts +++ b/src/app/chatroom.service.ts @@ -1,8 +1,10 @@ import { Injectable } from '@angular/core'; -import { HttpClient } from "@angular/common/http"; -import { Observable } from "rxjs"; -import { Chatroom } from "./chatroom"; -import { Message } from "./message"; +import { HttpClient } from '@angular/common/http'; +import { Observable, Subscriber } from 'rxjs'; +import { Chatroom } from './chatroom'; +import { Message } from './message'; + +export const SSE_RECONNECT_UPPER_LIMIT = 64; @Injectable({ providedIn: 'root' @@ -13,6 +15,12 @@ export class ChatroomService { private uriGet = 'http://localhost:8080/get/'; private uriListen = 'http://localhost:8080/listen/'; + private channelUri: string = 'UNKNOWN'; + private eventSource?: EventSource; + private reconnectTimeout?: NodeJS.Timeout; + private reconnectFrequencySec: number = 1; + private channel: Subscriber = new Subscriber(); + constructor(private http: HttpClient) { } getChatrooms(): Observable { @@ -24,27 +32,79 @@ export class ChatroomService { } listen(id: String): Observable { - return new Observable((observer) => { - let url = this.uriListen + id; - let eventSource = new EventSource(url); - eventSource.onmessage = (event) => { - console.debug('Received event: ', event); - let message: Message = JSON.parse(event.data); - observer.next(message); - }; - eventSource.onerror = (error) => { - // readyState === 0 (closed) means the remote source closed the connection, - // so we can safely treat it as a normal situation. Another way - // of detecting the end of the stream is to insert a special element - // in the stream of events, which the client can identify as the last one. - if(eventSource.readyState === 0) { - console.log('The stream has been closed by the server.'); - eventSource.close(); - observer.complete(); - } else { - observer.error('EventSource error: ' + error); - } - } + this.channelUri = this.uriListen + id; + this.openChannel(); + let observable = new Observable((observer) => { + this.channel = observer; }); + return observable; + } + + // Creates SSE event source, handles SSE events + private openChannel(): void { + + // Close event source if current instance of SSE service has some + if (this.eventSource) { + this.closeChannel(); + this.eventSource = undefined; + } + + console.log('Opening channel to ' + this.channelUri) + + // Open new channel, create new EventSource + this.eventSource = new EventSource(this.channelUri); + + // Process default event + this.eventSource.onmessage = (event: MessageEvent) => { + console.log('Received event ' + event.type + ' for ' + this.channelUri); + this.processEvent(event); + }; + + // Process connection opened + this.eventSource.onopen = () => { + console.log('Channel opened to ' + this.channelUri); + this.reconnectFrequencySec = 1; + }; + + // Process error + this.eventSource.onerror = (error: Event) => { + console.log('Error on channel ' + this.channelUri + ": "+ error.type); + this.reconnectChannel(); + }; + } + + // Processes custom event types + private processEvent(messageEvent: MessageEvent): void { + const parsed = messageEvent.data ? JSON.parse(messageEvent.data) : {}; + switch (messageEvent.type) { + case 'message': { + this.channel.next(parsed); + break; + } + // Add others if neccessary + default: { + console.error('Unhandled event:', messageEvent.type); + break; + } + } + } + + private closeChannel(): void { + console.log('Closing channel to: ' + this.channelUri); + this.eventSource?.close(); + } + + // Handles reconnect attempts when the connection fails for some reason. + private reconnectChannel(): void { + const self = this; + this.closeChannel(); + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = setTimeout(() => { + self.openChannel(); + self.reconnectFrequencySec *= 2; + if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) { + self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT; + } + }, this.reconnectFrequencySec * 1000); } }