import { Injectable } from '@angular/core'; import { Observable, Subject, BehaviorSubject } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { catchError, retryWhen, tap, delay } from 'rxjs/operators'; import { SessionStore } from 'src/app/store/session.service'; import { v4 as uuidv4 } from 'uuid' export interface WebSocketMessage { type: string; payload: any; requestId?: string; } interface WebSocketError { type: string; error: any; } @Injectable({ providedIn: 'root' }) export class WebSocketService { private socket$: WebSocketSubject; private messageSubject$: Subject; private connectionStatus$: BehaviorSubject; private reconnectAttempts = 0; private readonly maxReconnectAttempts = 5; callback: {[key: string]: Function} = {} constructor() { this.messageSubject$ = new Subject(); this.connectionStatus$ = new BehaviorSubject(false); this.connect('http://5.180.182.151:8080/') this.messages$.subscribe(({payload, requestId}) => { if(this.callback[requestId]) { this.callback[requestId]({payload, requestId}) delete this.callback[requestId] } }) } public connect(url: string) { this.socket$ = webSocket(url); this.socket$.pipe( tap({ error: () => { this.connectionStatus$.next(false); } }), retryWhen(errors => errors.pipe( tap(() => { this.reconnectAttempts++; if (this.reconnectAttempts >= this.maxReconnectAttempts) { throw new Error('Max reconnect attempts reached'); } }), delay(1000) )) ).subscribe( (message) => { this.messageSubject$.next(message); if (!this.connectionStatus$.getValue()) { this.connectionStatus$.next(true); this.reconnectAttempts = 0; // Send a message when the connection is established this.sendMessage(SessionStore.user.UserId as any).subscribe(); } }, (err) => { console.error('WebSocket connection error:', err); }, () => { console.log('WebSocket connection closed'); this.connectionStatus$.next(false); } ); } public sendMessage(message: WebSocketMessage): Observable { return new Observable(observer => { if(typeof message == 'object') { message.requestId = uuidv4() this.socket$.next(message); this.callback[message.requestId] = ({payload, requestId})=> { observer.next(payload as any); observer.complete(); } } else { this.socket$.next(message); observer.next({} as any); observer.complete(); } }).pipe( catchError(err => { console.error('Send message error:', err); return new Observable(observer => { observer.error({ type: 'SEND_ERROR', error: err }); }); }) ); } public get messages$(): Observable { return this.messageSubject$.asObservable(); } public get connectionStatus(): Observable { return this.connectionStatus$.asObservable(); } }