diff --git a/package-lock.json b/package-lock.json index e3027d709..b9ff2b0d1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -189,7 +189,7 @@ "videogular2": "^7.0.2", "webpack": "^5.88.2", "wordcloud": "^1.1.2", - "ws": "^7.4.6", + "ws": "^7.5.9", "zod": "^3.22.2", "zone.js": "~0.11.4" }, @@ -43735,9 +43735,9 @@ } }, "node_modules/ws": { - "version": "7.5.5", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", - "integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", + "version": "7.5.9", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", + "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", "engines": { "node": ">=8.3.0" }, @@ -77484,9 +77484,9 @@ } }, "ws": { - "version": "7.5.5", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.5.tgz", - "integrity": "sha512-BAkMFcAzl8as1G/hArkxOxq3G7pjUqQ3gzYbLL0/5zNkph70e+lCoxBGnm6AW1+/aiNeV4fnKqZ8m4GZewmH2w==", + "version": "7.5.9", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", + "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", "requires": {} }, "xdg-basedir": { diff --git a/package.json b/package.json index 4f3bef721..e2e11caf3 100644 --- a/package.json +++ b/package.json @@ -205,7 +205,7 @@ "videogular2": "^7.0.2", "webpack": "^5.88.2", "wordcloud": "^1.1.2", - "ws": "^7.4.6", + "ws": "^7.5.9", "zod": "^3.22.2", "zone.js": "~0.11.4" }, diff --git a/socket-server.js b/socket-server.js new file mode 100644 index 000000000..238fa7069 Binary files /dev/null and b/socket-server.js differ diff --git a/src/app/services/Repositorys/chat/data-source/message/message-live-data-source.service.ts b/src/app/services/Repositorys/chat/data-source/message/message-live-data-source.service.ts index dc38f0da9..4987ec5ea 100644 --- a/src/app/services/Repositorys/chat/data-source/message/message-live-data-source.service.ts +++ b/src/app/services/Repositorys/chat/data-source/message/message-live-data-source.service.ts @@ -1,53 +1,87 @@ import { Injectable } from '@angular/core'; -import { Subject, BehaviorSubject, Observable } from 'rxjs'; -import { filter } from 'rxjs/operators'; +import { Observable, Subject, BehaviorSubject } from 'rxjs'; +import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; +import { catchError, retryWhen, tap, delay } from 'rxjs/operators'; + +interface WebSocketMessage { + type: string; + payload: any; +} + +interface WebSocketError { + type: string; + error: any; +} @Injectable({ providedIn: 'root' }) export class MessageLiveDataSourceService { + private socket$: WebSocketSubject; + private messageSubject$: Subject; + private connectionStatus$: BehaviorSubject; + private reconnectAttempts = 0; + private readonly maxReconnectAttempts = 5; - private messageSubject: Subject = new Subject(); - private isPaused: BehaviorSubject = new BehaviorSubject(false); - private messageBuffer: any[] = []; // Buffer to store messages when paused - private authentication!: string - - constructor() { } - - handleMessage(event: any) { - const data: any = event.data || MessageEvent; - - if (this.isPaused.getValue()) { - this.messageBuffer.push({}); // Buffer the message if paused - } else { - this.messageSubject.next({} as any); - } + constructor() { + this.messageSubject$ = new Subject(); + this.connectionStatus$ = new BehaviorSubject(false); } + public connect(url: string) { + this.socket$ = webSocket(url); - pauseBroadcast() { - this.isPaused.next(true); + this.socket$.pipe( + tap({ + error: () => { + this.connectionStatus$.next(false); + } + }), + retryWhen(errors => errors.pipe( + tap(() => { + this.reconnectAttempts++; + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + throw new Error('Max reconnect attempts reached'); + } + }), + delay(1000) + )) + ).subscribe( + (message) => { + this.messageSubject$.next(message); + this.connectionStatus$.next(true); + this.reconnectAttempts = 0; + }, + (err) => { + console.error('WebSocket connection error:', err); + }, + () => { + console.log('WebSocket connection closed'); + this.connectionStatus$.next(false); + } + ); } - resumeBroadcast() { - this.isPaused.next(false); - this.flushMessageBuffer(); // Emit all buffered messages + public sendMessage(message: WebSocketMessage): Observable { + return new Observable(observer => { + this.socket$.next(message); + observer.next(); + observer.complete(); + }).pipe( + catchError(err => { + console.error('Send message error:', err); + return new Observable(observer => { + observer.error({ type: 'SEND_ERROR', error: err }); + }); + }) + ); } - private flushMessageBuffer() { - while (this.messageBuffer.length > 0) { - const bufferedMessage = this.messageBuffer.shift(); - this.messageSubject.next(bufferedMessage); - } + public get messages$(): Observable { + return this.messageSubject$.asObservable(); } - subscribe(endpoint: any): Observable { - return this.messageSubject.pipe( - filter(message => - message && - message.endpoint === endpoint && - message.authentication == this.authentication - ) - ) + public get connectionStatus(): Observable { + return this.connectionStatus$.asObservable(); } -} +} \ No newline at end of file