From 856a0cf137a786d4b120f992d0c9af54962ac442 Mon Sep 17 00:00:00 2001 From: Peter Maquiran Date: Wed, 12 Jun 2024 15:20:07 +0100 Subject: [PATCH] add socket connector --- package-lock.json | 14 +-- package.json | 2 +- socket-server.js | Bin 0 -> 2214 bytes .../message-live-data-source.service.ts | 106 ++++++++++++------ 4 files changed, 78 insertions(+), 44 deletions(-) create mode 100644 socket-server.js diff --git a/package-lock.json b/package-lock.json index e3027d709..b9ff2b0d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -189,7 +189,7 @@ "videogular2": "^7.0.2", "webpack": "^5.88.2", "wordcloud": "^1.1.2", - "ws": "^7.4.6", + "ws": "^7.5.9", "zod": "^3.22.2", "zone.js": "~0.11.4" }, @@ -43735,9 +43735,9 @@ } }, "node_modules/ws": { - "version": "7.5.5", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", - "integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", + "version": "7.5.9", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", + "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", "engines": { "node": ">=8.3.0" }, @@ -77484,9 +77484,9 @@ } }, "ws": { - "version": "7.5.5", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", - "integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", + "version": "7.5.9", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", + "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", "requires": {} }, "xdg-basedir": { diff --git a/package.json b/package.json index 4f3bef721..e2e11caf3 100644 --- a/package.json +++ b/package.json @@ -205,7 +205,7 @@ "videogular2": "^7.0.2", "webpack": "^5.88.2", "wordcloud": "^1.1.2", - "ws": "^7.4.6", + "ws": "^7.5.9", "zod": "^3.22.2", "zone.js": "~0.11.4" }, diff --git a/socket-server.js b/socket-server.js new file mode 100644 index 0000000000000000000000000000000000000000..238fa7069fa2612456f06e224703ee714cb80b29 GIT binary patch literal 2214 zcmcguO>fgc5S=p;|6y-25;7bSs2o7OAkiW?a8B*CSPgOT8ltG`UkBcsVY46E^3emb z9PN&0=i|+r+5Gt4$tWA~GK-UsvXTq=07)m;SW|X_cf$IMe3e00jVxq`6AO7HFXW*- z&^NW?p+Ctt{WS|4tanupE&N68MsA?BMFbPPKgar+Jk{69ZRRia(P|Ei%c7vS4(A7z zFXS=o_#&&9klkqH4yzFTC~KT;@jJ?u(Bf;X2go*xMs9y4eSrdPJ;CGT6*3rQ35^cF zhVK&c&#>k3EMb3z{R}OK-5L+$h~7N*JNcx(so@ZVp_pog*h9|#kW-FOdd4HeOZc7S z6m~oKXfn^NS&k3$8gRxHnk{UsV565k>NsOHYgQ+)K;|4Xs+Dtj2ft^iW?BJy;FHGL z0w*Hh!|NN^KgTJ`W_TSSztWiY_)6Byh+zwTvSy**nWI&Ii^%rC)+%BeXV4`lQ;#K@ zR$=2_V0(!4_JDNGOnx^GoGP}g?}g%gja6QM*4P*@Yo+sO*ju!gayN24w`6(dvU(lw z=K6ae(%~JncY7dNvs&U1>~UHU=Si#9%Uf7VnNB{$Dn`wwz$DlY*+)EUJr!^(GcWs! zQ!exftC}MnyTU^jcVkoYmfkt!Jm=ele-7+Do7hg#; + private messageSubject$: Subject; + private connectionStatus$: BehaviorSubject; + private reconnectAttempts = 0; + private readonly maxReconnectAttempts = 5; - private messageSubject: Subject = new Subject(); - private isPaused: BehaviorSubject = new BehaviorSubject(false); - private messageBuffer: any[] = []; // Buffer to store messages when paused - private authentication!: string - - constructor() { } - - handleMessage(event: any) { - const data: any = event.data || MessageEvent; - - if (this.isPaused.getValue()) { - this.messageBuffer.push({}); // Buffer the message if paused - } else { - this.messageSubject.next({} as any); - } + constructor() { + this.messageSubject$ = new Subject(); + this.connectionStatus$ = new BehaviorSubject(false); } + public connect(url: string) { + this.socket$ = webSocket(url); - pauseBroadcast() { - this.isPaused.next(true); + this.socket$.pipe( + tap({ + error: () => { + this.connectionStatus$.next(false); + } + }), + retryWhen(errors => errors.pipe( + tap(() => { + this.reconnectAttempts++; + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + throw new Error('Max reconnect attempts reached'); + } + }), + delay(1000) + )) + ).subscribe( + (message) => { + this.messageSubject$.next(message); + this.connectionStatus$.next(true); + this.reconnectAttempts = 0; + }, + (err) => { + console.error('WebSocket connection error:', err); + }, + () => { + console.log('WebSocket connection closed'); + this.connectionStatus$.next(false); + } + ); } - resumeBroadcast() { - this.isPaused.next(false); - this.flushMessageBuffer(); // Emit all buffered messages + public sendMessage(message: WebSocketMessage): Observable { + return new Observable(observer => { + this.socket$.next(message); + observer.next(); + observer.complete(); + }).pipe( + catchError(err => { + console.error('Send message error:', err); + return new Observable(observer => { + observer.error({ type: 'SEND_ERROR', error: err }); + }); + }) + ); } - private flushMessageBuffer() { - while (this.messageBuffer.length > 0) { - const bufferedMessage = this.messageBuffer.shift(); - this.messageSubject.next(bufferedMessage); - } + public get messages$(): Observable { + return this.messageSubject$.asObservable(); } - subscribe(endpoint: any): Observable { - return this.messageSubject.pipe( - filter(message => - message && - message.endpoint === endpoint && - message.authentication == this.authentication - ) - ) + public get connectionStatus(): Observable { + return this.connectionStatus$.asObservable(); } -} +} \ No newline at end of file