import * as signalR from '@microsoft/signalr'; import { BehaviorSubject, Observable } from 'rxjs'; import { ok, Result, err } from 'neverthrow'; import { SessionStore } from 'src/app/store/session.service'; import { filter, first } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid' import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO'; import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO'; import { MessageDeleteInputDTO } from '../../data/dto/message/messageDeleteInputDTO'; import { MessageReactionInput } from '../../domain/use-case/message-reaction-use-case.service'; import { ISignalRInput } from './signal-r.service'; export class SignalRConnection { private hubConnection: signalR.HubConnection; private messageSubject: BehaviorSubject = new BehaviorSubject(null); private messageDelete: BehaviorSubject = new BehaviorSubject(null); private messageUPdateSubject: BehaviorSubject = new BehaviorSubject(null); private typingSubject: BehaviorSubject = new BehaviorSubject(null); private readAtSubject: BehaviorSubject = new BehaviorSubject(null); private connectionStateSubject: BehaviorSubject = new BehaviorSubject(false); private disconnectSubject: BehaviorSubject = new BehaviorSubject(false); private reconnectSubject: BehaviorSubject = new BehaviorSubject(false); private sendLaterSubject: BehaviorSubject = new BehaviorSubject(false); private reconnect = true private sendDataSubject: BehaviorSubject = new BehaviorSubject(false); url: string constructor({url}) { this.url = url } establishConnection(): Promise> { return new Promise((resolve, reject) => { const hubConnection = new signalR.HubConnectionBuilder() .withUrl(this.url) .build(); this.hubConnection = hubConnection hubConnection .start() .then(() => { console.log('Connection started'); this.connectionStateSubject.next(true); this.hubConnection = hubConnection this.join() this.addMessageListener() resolve(ok(hubConnection)) }) .catch(error => { console.log('Error while starting connection: ' + error); this.connectionStateSubject.next(false); reject(err(false)) }); hubConnection.onclose(() => { console.log('Connection closed'); this.connectionStateSubject.next(false); this.disconnectSubject.next(true) if(this.reconnect) { this.attempReconnect(); } }); }) } async attempReconnect() { const attempConnection = await this.establishConnection() if(attempConnection.isOk()) { this.reconnectSubject.next(true) } } public join() { if(this.connectionStateSubject.value == true) { console.log('join=================') this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName); //this.hubConnection.invoke("Join", 105, "UserFirefox"); } else { this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]}) } } public async sendMessage(data: Object & { requestId}):Promise> { return new Promise((resolve, reject) => { if(this.connectionStateSubject.value == true) { console.log('sendMessage', data) this.hubConnection.invoke("SendMessage", data) this.messageSubject.pipe( filter((message: any) => data.requestId == message?.requestId), first() ).subscribe(value => { resolve(ok(value)) console.log('Received valid value:', value); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } public async deleteMessage(data: MessageDeleteInputDTO) { return new Promise((resolve, reject) => { if(this.connectionStateSubject.value == true) { console.log('delete message', data) this.hubConnection.invoke("DeleteMessage", data) this.messageSubject.pipe( filter((message: any) => data.requestId == message?.requestId), first() ).subscribe((value) => { resolve(ok(value)) }) } }) } public async typing(data: Object & { roomId, UserName, userId }):Promise> { return new Promise((resolve, reject) => { const requestId = uuidv4() if(this.connectionStateSubject.value == true) { console.log('send typing', data) try { this.hubConnection.invoke("Typing", data) } catch (error) {} this.typingSubject.pipe( filter((message: any) => { return requestId == message?.requestId }), first() ).subscribe(value => { resolve(ok(value)); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } public async sendReadAt(data: Object & { roomId, memberId, chatMessageId}):Promise> { return new Promise((resolve, reject) => { const requestId = uuidv4() if(this.connectionStateSubject.value == true) { try { this.hubConnection.invoke("ReadAt", { roomId: data.roomId, memberId: data.memberId, requestId, messageId: data.chatMessageId} as any) } catch (error) {} this.readAtSubject.pipe( filter((message: any) => { return requestId == message?.requestId }), first() ).subscribe(value => { resolve(ok(value)); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } public async sendReactMessage(data: MessageReactionInput):Promise> { return new Promise((resolve, reject) => { const requestId = uuidv4() if(this.connectionStateSubject.value == true) { try { this.hubConnection.invoke("ReactMessage", { roomId: data.roomId, memberId: data.memberId, requestId, messageId: data.messageId} as any) } catch (error) {} this.messageUPdateSubject.pipe( filter((message: any) => { return requestId == message?.requestId }), first() ).subscribe(value => { resolve(ok(value)); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } sendData(input: ISignalRInput) { return new Promise((resolve, reject) => { if(this.connectionStateSubject.value == true) { this.hubConnection.invoke(input.method, input.data) this.sendDataSubject.pipe( filter((message: any) => input.data.requestId == message?.requestId), first() ).subscribe(value => { resolve(ok(value)) console.log('Received valid value:', value); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: input}) return reject(err(false)) } }) } private addMessageListener(): void { console.log('listening') this.hubConnection.on('ReceiveMessage', (message: MessageOutPutDataDTO) => { console.log('ReceiveMessage', message) this.messageSubject.next(message); this.sendDataSubject.next({ method: 'ReceiveMessage', data: message }) }); this.hubConnection.on('TypingMessage', (_typing: UserTypingDTO) => { console.log('Typing', _typing) this.typingSubject.next(_typing); this.sendDataSubject.next({ method: 'ReceiveMessage', data: _typing }) }); this.hubConnection.on('ReadAt', (_message) => { console.log('ReadAt', _message) this.readAtSubject.next(_message); this.sendDataSubject.next({ method: 'ReceiveMessage', data: _message }) }); this.hubConnection.on('DeleteMessage', (_message) => { console.log('DeleteMessage', _message) this.messageDelete.next(_message); this.sendDataSubject.next({ method: 'ReceiveMessage', data: _message }) }); this.hubConnection.on('UpdateMessage', (_message) => { console.log('UpdateMessage', _message) this.messageUPdateSubject.next(_message); this.sendDataSubject.next({ method: 'ReceiveMessage', data: _message }) }) } public getMessageUpdateSubject() { return this.messageUPdateSubject.asObservable() } public getMessages() { return this.messageSubject.asObservable() } public getTyping() { return this.typingSubject.asObservable() } public getConnectionState(): Observable { return this.connectionStateSubject.asObservable(); } public getDisconnectTrigger(): Observable { return this.disconnectSubject.asObservable(); } public getSendLater() { return this.sendLaterSubject.asObservable(); } public getMessageDelete() { return this.messageDelete.asObservable() } public getData() { return this.sendDataSubject.asObservable() } public closeConnection(): void { this.reconnect = false if (this.hubConnection) { this.hubConnection .stop() .then(() => { console.log('Connection closed by user'); this.connectionStateSubject.next(false); }) .catch(err => console.log('Error while closing connection: ' + err)); } } }