mirror of
https://code.equilibrium.co.ao/ITO/doneit-web.git
synced 2026-04-18 20:47:54 +00:00
88 lines
2.3 KiB
TypeScript
88 lines
2.3 KiB
TypeScript
import { Injectable } from '@angular/core';
|
|
import { Observable, Subject, BehaviorSubject } from 'rxjs';
|
|
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
|
|
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
|
|
|
|
interface WebSocketMessage {
|
|
type: string;
|
|
payload: any;
|
|
}
|
|
|
|
interface WebSocketError {
|
|
type: string;
|
|
error: any;
|
|
}
|
|
|
|
@Injectable({
|
|
providedIn: 'root'
|
|
})
|
|
export class MessageLiveDataSourceService {
|
|
private socket$: WebSocketSubject<WebSocketMessage>;
|
|
private messageSubject$: Subject<WebSocketMessage>;
|
|
private connectionStatus$: BehaviorSubject<boolean>;
|
|
private reconnectAttempts = 0;
|
|
private readonly maxReconnectAttempts = 5;
|
|
|
|
constructor() {
|
|
this.messageSubject$ = new Subject<WebSocketMessage>();
|
|
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
|
|
}
|
|
|
|
public connect(url: string) {
|
|
this.socket$ = webSocket<WebSocketMessage>(url);
|
|
|
|
this.socket$.pipe(
|
|
tap({
|
|
error: () => {
|
|
this.connectionStatus$.next(false);
|
|
}
|
|
}),
|
|
retryWhen(errors => errors.pipe(
|
|
tap(() => {
|
|
this.reconnectAttempts++;
|
|
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
|
throw new Error('Max reconnect attempts reached');
|
|
}
|
|
}),
|
|
delay(1000)
|
|
))
|
|
).subscribe(
|
|
(message) => {
|
|
this.messageSubject$.next(message);
|
|
this.connectionStatus$.next(true);
|
|
this.reconnectAttempts = 0;
|
|
},
|
|
(err) => {
|
|
console.error('WebSocket connection error:', err);
|
|
},
|
|
() => {
|
|
console.log('WebSocket connection closed');
|
|
this.connectionStatus$.next(false);
|
|
}
|
|
);
|
|
}
|
|
|
|
public sendMessage(message: WebSocketMessage): Observable<void> {
|
|
return new Observable<void>(observer => {
|
|
this.socket$.next(message);
|
|
observer.next();
|
|
observer.complete();
|
|
}).pipe(
|
|
catchError(err => {
|
|
console.error('Send message error:', err);
|
|
return new Observable<never>(observer => {
|
|
observer.error({ type: 'SEND_ERROR', error: err });
|
|
});
|
|
})
|
|
);
|
|
}
|
|
|
|
public get messages$(): Observable<WebSocketMessage> {
|
|
return this.messageSubject$.asObservable();
|
|
}
|
|
|
|
public get connectionStatus(): Observable<boolean> {
|
|
return this.connectionStatus$.asObservable();
|
|
}
|
|
}
|