Files
doneit-web/src/app/infra/socket/signalR/signalR.ts
T
2024-08-27 11:08:52 +01:00

205 lines
5.8 KiB
TypeScript

import * as signalR from '@microsoft/signalr';
import { BehaviorSubject, Observable, race, timer } from 'rxjs';
import { ok, Result, err } from 'neverthrow';
import { SessionStore } from 'src/app/store/session.service';
import { filter, first } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid'
import { MessageOutPutDataDTO } from 'src/app/module/chat/data/dto/message/messageOutputDTO';
import { ISignalRInput } from '../type';
export interface SocketMessage<T> {
method: string,
data: T
}
export enum EnumSocketError {
catch = 1,
close
}
export class SignalRConnection {
private hubConnection: signalR.HubConnection;
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private disconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private reconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private sendLaterSubject: BehaviorSubject<Object> = new BehaviorSubject<Object>(false);
private reconnect = true
private sendDataSubject: BehaviorSubject<{method: string, data: any}> = new BehaviorSubject<{method: string, data: any}>(null);
private pendingRequests: Map<string, { resolve: Function; reject: Function }> = new Map();
url: string
private hasConnectOnce = false
constructor({url}) {
this.url = url
}
establishConnection(): Promise<Result<signalR.HubConnection, false>> {
return new Promise((resolve, reject) => {
const hubConnection = new signalR.HubConnectionBuilder()
.withUrl(this.url)
.build();
this.hubConnection = hubConnection
hubConnection
.start()
.then(() => {
this.hubConnection = hubConnection
this.hasConnectOnce = true
console.log('Connection started');
this.connectionStateSubject.next(true);
this.join()
this.addMessageListener()
resolve(ok(hubConnection))
})
.catch(error => {
console.log('Error while starting connection: ' + error);
this.connectionStateSubject.next(false);
if(this.hasConnectOnce) {
setTimeout(()=> {
this.attempReconnect();
}, 2000)
}
resolve(err(false))
});
hubConnection.onclose(() => {
console.log('Connection closed');
this.connectionStateSubject.next(false);
this.disconnectSubject.next(true)
this.pendingRequests.forEach((_, requestId) => {
const data = this.pendingRequests.get(requestId);
if(data) {
const { reject } = data
reject(err({
type: EnumSocketError.close
}));
this.pendingRequests.delete(requestId);
}
});
if(this.reconnect) {
this.attempReconnect();
}
});
})
}
async attempReconnect() {
const attempConnection = await this.establishConnection()
if(attempConnection.isOk()) {
this.reconnectSubject.next(true)
}
}
public join() {
if(this.connectionStateSubject.value == true) {
this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName);
//this.hubConnection.invoke("Join", 105, "UserFirefox");
} else {
this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]})
}
}
sendData<T>(input: ISignalRInput): Promise<Result<T, any>> {
return new Promise((resolve, reject) => {
if(this.connectionStateSubject.value == true) {
try {
this.pendingRequests.set(input.data.requestId, { resolve, reject: (data: any) => resolve(data) });
this.hubConnection.invoke(input.method, input.data)
this.sendDataSubject.pipe(
filter((message) => input.data.requestId == message?.data.requestId),
first()
).subscribe(value => {
resolve(ok(value.data as unknown as T))
// console.log('Received valid value:', value);
});
} catch(error) {
resolve(err({
type: EnumSocketError.catch
}))
}
} else {
this.sendLaterSubject.next({method: 'SendMessage', args: input})
return reject(err(false))
}
})
}
private addMessageListener(): void {
const methods = ['ReceiveMessage', 'TypingMessage', 'AvailableUsers',
'ReadAt', 'DeleteMessage', 'UpdateMessage', 'GroupAddedMembers',
'GroupDeletedMembers', 'UserAddGroup']
for(const method of methods) {
this.hubConnection.on(method, (message: any) => {
this.messageSubject.next(message);
this.sendDataSubject.next({
method: method,
data: message
})
});
}
}
public getConnectionState(): Observable<boolean> {
return this.connectionStateSubject.asObservable();
}
public getDisconnectTrigger(): Observable<boolean> {
return this.disconnectSubject.asObservable();
}
public getData() {
return this.sendDataSubject.asObservable()
}
public closeConnection(): void {
this.reconnect = false
if (this.hubConnection) {
this.hubConnection
.stop()
.then(() => {
console.log('Connection closed by user');
this.connectionStateSubject.next(false);
this.pendingRequests.forEach((_, requestId) => {
const data = this.pendingRequests.get(requestId);
if(data) {
const { reject } = data
reject(err({
type: EnumSocketError.close
}));
this.pendingRequests.delete(requestId);
}
});
})
.catch(err => console.log('Error while closing connection: ' + err));
}
}
}