Files
doneit-web/src/app/module/chat/infra/socket/socket.ts
T

130 lines
3.3 KiB
TypeScript
Raw Normal View History

2024-06-13 12:11:17 +01:00
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
import { SessionStore } from 'src/app/store/session.service';
import { v4 as uuidv4 } from 'uuid'
export interface WebSocketMessage {
type: string;
payload: any;
requestId?: string;
}
interface WebSocketError {
type: string;
error: any;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private socket$: WebSocketSubject<WebSocketMessage>;
private messageSubject$: Subject<WebSocketMessage>;
private connectionStatus$: BehaviorSubject<boolean>;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
callback: {[key: string]: Function} = {}
constructor() {
this.messageSubject$ = new Subject<WebSocketMessage>();
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
2024-07-22 13:28:52 +01:00
// this.connect('https://5-180-182-151.cloud-xip.com:85/ws/')
2024-06-13 12:11:17 +01:00
2024-07-22 13:28:52 +01:00
// this.messages$.subscribe(({payload, requestId, type}) => {
// if(this.callback[requestId]) {
// this.callback[requestId]({payload, requestId, type})
// delete this.callback[requestId]
2024-06-13 12:11:17 +01:00
2024-06-14 09:30:14 +01:00
2024-07-22 13:28:52 +01:00
// }
// })
2024-06-13 12:11:17 +01:00
}
public connect(url: string) {
this.socket$ = webSocket<WebSocketMessage>(url);
this.socket$.pipe(
tap({
error: () => {
this.connectionStatus$.next(false);
}
}),
retryWhen(errors => errors.pipe(
tap(() => {
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
throw new Error('Max reconnect attempts reached');
}
}),
delay(1000)
))
).subscribe(
(message) => {
this.messageSubject$.next(message);
if (!this.connectionStatus$.getValue()) {
this.connectionStatus$.next(true);
this.reconnectAttempts = 0;
// Send a message when the connection is established
this.sendMessage(SessionStore.user.UserId as any).subscribe();
}
},
(err) => {
console.error('WebSocket connection error:', err);
},
() => {
console.log('WebSocket connection closed');
this.connectionStatus$.next(false);
}
);
}
public sendMessage(message: WebSocketMessage): Observable<any> {
return new Observable<void>(observer => {
2024-06-14 09:30:14 +01:00
2024-06-13 12:11:17 +01:00
if(typeof message == 'object') {
message.requestId = uuidv4()
this.socket$.next(message);
this.callback[message.requestId] = ({payload, requestId})=> {
observer.next(payload as any);
observer.complete();
}
} else {
this.socket$.next(message);
observer.next({} as any);
observer.complete();
}
2024-06-14 09:30:14 +01:00
2024-06-13 12:11:17 +01:00
}).pipe(
catchError(err => {
console.error('Send message error:', err);
return new Observable<never>(observer => {
observer.error({ type: 'SEND_ERROR', error: err });
});
})
);
}
public get messages$(): Observable<WebSocketMessage> {
return this.messageSubject$.asObservable();
}
public get connectionStatus(): Observable<boolean> {
return this.connectionStatus$.asObservable();
}
2024-06-14 09:30:14 +01:00
}