import * as signalR from '@microsoft/signalr'; import { BehaviorSubject, Observable } from 'rxjs'; import { ok, Result, err } from 'neverthrow'; import { SessionStore } from 'src/app/store/session.service'; import { filter, first } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid' import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO'; import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO'; export class SignalRConnection { private hubConnection: signalR.HubConnection; private messageSubject: BehaviorSubject = new BehaviorSubject(null); private typingSubject: BehaviorSubject = new BehaviorSubject(null); private readAtSubject: BehaviorSubject = new BehaviorSubject(null); private connectionStateSubject: BehaviorSubject = new BehaviorSubject(false); private disconnectSubject: BehaviorSubject = new BehaviorSubject(false); private reconnectSubject: BehaviorSubject = new BehaviorSubject(false); private sendLaterSubject: BehaviorSubject = new BehaviorSubject(false); private reconnect = true url: string constructor({url}) { this.url = url } establishConnection(): Promise> { return new Promise((resolve, reject) => { console.log('try to connect'); const hubConnection = new signalR.HubConnectionBuilder() .withUrl(this.url) .build(); this.hubConnection = hubConnection hubConnection .start() .then(() => { console.log('Connection started'); this.connectionStateSubject.next(true); this.hubConnection = hubConnection this.join() this.addMessageListener() resolve(ok(hubConnection)) }) .catch(error => { console.log('Error while starting connection: ' + error); this.connectionStateSubject.next(false); reject(err(false)) }); hubConnection.onclose(() => { console.log('Connection closed'); this.connectionStateSubject.next(false); this.disconnectSubject.next(true) if(this.reconnect) { this.attempReconnect(); } }); }) } async attempReconnect() { const attempConnection = await this.establishConnection() if(attempConnection.isOk()) { this.reconnectSubject.next(true) } } public join() { if(this.connectionStateSubject.value == true) { console.log('join=================') this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName); //this.hubConnection.invoke("Join", 105, "UserFirefox"); } else { this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]}) } } public async sendMessage(data: Object & { requestId}):Promise> { return new Promise((resolve, reject) => { if(this.connectionStateSubject.value == true) { console.log('sendMessage', data) this.hubConnection.invoke("SendMessage", data) this.messageSubject.pipe( filter((message: any) => data.requestId == message?.requestId), first() ).subscribe(value => { resolve(ok(value)) console.log('Received valid value:', value); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } public async typing(data: Object & { roomId, UserName, userId }):Promise> { return new Promise((resolve, reject) => { const requestId = uuidv4() if(this.connectionStateSubject.value == true) { console.log('send typing', data) try { this.hubConnection.invoke("Typing", { userName: data.UserName, roomId: data.roomId, userId: data.userId +'', requestId } as any) } catch (error) {} this.typingSubject.pipe( filter((message: any) => { return requestId == message?.requestId }), first() ).subscribe(value => { resolve(ok(value)); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } public async sendReadAt(data: Object & { roomId, memberId, chatMessageId}):Promise> { return new Promise((resolve, reject) => { const requestId = uuidv4() if(this.connectionStateSubject.value == true) { try { this.hubConnection.invoke("ReadAt", { roomId: data.roomId, memberId: data.memberId, requestId } as any) } catch (error) {} this.readAtSubject.pipe( filter((message: any) => { return requestId == message?.requestId }), first() ).subscribe(value => { resolve(ok(value)); }); } else { this.sendLaterSubject.next({method: 'SendMessage', args: data}) return reject(err(false)) } }) } private addMessageListener(): void { console.log('listening') this.hubConnection.on('ReceiveMessage', (message: MessageOutPutDataDTO) => { console.log('ReceiveMessage', message) this.messageSubject.next(message); }); this.hubConnection.on('TypingMessage', (_typing: UserTypingDTO) => { console.log('Typing', _typing) this.typingSubject.next(_typing); }); this.hubConnection.on('ReadAt', (_message) => { console.log('ReadAt', _message) this.readAtSubject.next(_message); }); } public getMessages() { return this.messageSubject.asObservable() } public getTyping() { return this.typingSubject.asObservable() } public getConnectionState(): Observable { return this.connectionStateSubject.asObservable(); } public getDisconnectTrigger(): Observable { return this.disconnectSubject.asObservable(); } public getSendLater() { return this.sendLaterSubject.asObservable(); } public closeConnection(): void { this.reconnect = false if (this.hubConnection) { this.hubConnection .stop() .then(() => { console.log('Connection closed by user'); this.connectionStateSubject.next(false); }) .catch(err => console.log('Error while closing connection: ' + err)); } } }