import { Injectable } from '@angular/core'; import { Observable, Subject, BehaviorSubject } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { catchError, retryWhen, tap, delay } from 'rxjs/operators'; interface WebSocketMessage { type: string; payload: any; } interface WebSocketError { type: string; error: any; } @Injectable({ providedIn: 'root' }) export class MessageLiveDataSourceService { private socket$: WebSocketSubject; private messageSubject$: Subject; private connectionStatus$: BehaviorSubject; private reconnectAttempts = 0; private readonly maxReconnectAttempts = 5; constructor() { this.messageSubject$ = new Subject(); this.connectionStatus$ = new BehaviorSubject(false); } public connect(url: string) { this.socket$ = webSocket(url); this.socket$.pipe( tap({ error: () => { this.connectionStatus$.next(false); } }), retryWhen(errors => errors.pipe( tap(() => { this.reconnectAttempts++; if (this.reconnectAttempts >= this.maxReconnectAttempts) { throw new Error('Max reconnect attempts reached'); } }), delay(1000) )) ).subscribe( (message) => { this.messageSubject$.next(message); this.connectionStatus$.next(true); this.reconnectAttempts = 0; }, (err) => { console.error('WebSocket connection error:', err); }, () => { console.log('WebSocket connection closed'); this.connectionStatus$.next(false); } ); } public sendMessage(message: WebSocketMessage): Observable { return new Observable(observer => { this.socket$.next(message); observer.next(); observer.complete(); }).pipe( catchError(err => { console.error('Send message error:', err); return new Observable(observer => { observer.error({ type: 'SEND_ERROR', error: err }); }); }) ); } public get messages$(): Observable { return this.messageSubject$.asObservable(); } public get connectionStatus(): Observable { return this.connectionStatus$.asObservable(); } }