mirror of
https://code.equilibrium.co.ao/ITO/doneit-web.git
synced 2026-04-20 05:16:07 +00:00
add socket connector
This commit is contained in:
+70
-36
@@ -1,53 +1,87 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { Subject, BehaviorSubject, Observable } from 'rxjs';
|
||||
import { filter } from 'rxjs/operators';
|
||||
import { Observable, Subject, BehaviorSubject } from 'rxjs';
|
||||
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
|
||||
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
|
||||
|
||||
interface WebSocketMessage {
|
||||
type: string;
|
||||
payload: any;
|
||||
}
|
||||
|
||||
interface WebSocketError {
|
||||
type: string;
|
||||
error: any;
|
||||
}
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class MessageLiveDataSourceService {
|
||||
private socket$: WebSocketSubject<WebSocketMessage>;
|
||||
private messageSubject$: Subject<WebSocketMessage>;
|
||||
private connectionStatus$: BehaviorSubject<boolean>;
|
||||
private reconnectAttempts = 0;
|
||||
private readonly maxReconnectAttempts = 5;
|
||||
|
||||
private messageSubject: Subject<any> = new Subject<any>();
|
||||
private isPaused: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
private messageBuffer: any[] = []; // Buffer to store messages when paused
|
||||
private authentication!: string
|
||||
|
||||
constructor() { }
|
||||
|
||||
handleMessage(event: any) {
|
||||
const data: any = event.data || MessageEvent;
|
||||
|
||||
if (this.isPaused.getValue()) {
|
||||
this.messageBuffer.push({}); // Buffer the message if paused
|
||||
} else {
|
||||
this.messageSubject.next({} as any);
|
||||
}
|
||||
constructor() {
|
||||
this.messageSubject$ = new Subject<WebSocketMessage>();
|
||||
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
|
||||
}
|
||||
|
||||
public connect(url: string) {
|
||||
this.socket$ = webSocket<WebSocketMessage>(url);
|
||||
|
||||
pauseBroadcast() {
|
||||
this.isPaused.next(true);
|
||||
this.socket$.pipe(
|
||||
tap({
|
||||
error: () => {
|
||||
this.connectionStatus$.next(false);
|
||||
}
|
||||
}),
|
||||
retryWhen(errors => errors.pipe(
|
||||
tap(() => {
|
||||
this.reconnectAttempts++;
|
||||
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
|
||||
throw new Error('Max reconnect attempts reached');
|
||||
}
|
||||
}),
|
||||
delay(1000)
|
||||
))
|
||||
).subscribe(
|
||||
(message) => {
|
||||
this.messageSubject$.next(message);
|
||||
this.connectionStatus$.next(true);
|
||||
this.reconnectAttempts = 0;
|
||||
},
|
||||
(err) => {
|
||||
console.error('WebSocket connection error:', err);
|
||||
},
|
||||
() => {
|
||||
console.log('WebSocket connection closed');
|
||||
this.connectionStatus$.next(false);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
resumeBroadcast() {
|
||||
this.isPaused.next(false);
|
||||
this.flushMessageBuffer(); // Emit all buffered messages
|
||||
public sendMessage(message: WebSocketMessage): Observable<void> {
|
||||
return new Observable<void>(observer => {
|
||||
this.socket$.next(message);
|
||||
observer.next();
|
||||
observer.complete();
|
||||
}).pipe(
|
||||
catchError(err => {
|
||||
console.error('Send message error:', err);
|
||||
return new Observable<never>(observer => {
|
||||
observer.error({ type: 'SEND_ERROR', error: err });
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private flushMessageBuffer() {
|
||||
while (this.messageBuffer.length > 0) {
|
||||
const bufferedMessage = this.messageBuffer.shift();
|
||||
this.messageSubject.next(bufferedMessage);
|
||||
}
|
||||
public get messages$(): Observable<WebSocketMessage> {
|
||||
return this.messageSubject$.asObservable();
|
||||
}
|
||||
|
||||
subscribe(endpoint: any): Observable<any> {
|
||||
return this.messageSubject.pipe(
|
||||
filter(message =>
|
||||
message &&
|
||||
message.endpoint === endpoint &&
|
||||
message.authentication == this.authentication
|
||||
)
|
||||
)
|
||||
public get connectionStatus(): Observable<boolean> {
|
||||
return this.connectionStatus$.asObservable();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user