import { NgModule } from '@angular/core'; import { SignalRService } from 'src/app/module/chat/infra/socket/signal-r.service' import { ChatServiceService } from 'src/app/module/chat/domain/chat-service.service' import { skip, switchMap } from 'rxjs/operators'; import { SessionStore } from 'src/app/store/session.service'; import { Subject, timer } from 'rxjs'; import { UserTypingLocalRepository } from './data/repository/user-typing-local-data-source.service'; @NgModule({ imports: [], providers: [], declarations: [], schemas: [], entryComponents: [] }) export class ChatModule { typingCallback: {[key: string]: Subject } = {} constructor( private SignalRService: SignalRService, private ChatServiceService: ChatServiceService, private signalR: SignalRService, private localDataSource: UserTypingLocalRepository, ) { this.syncMessage() this.listenToTyping() } async listenToTyping() { this.signalR.getTyping().subscribe(async (e) => { if(e?.roomId) { // this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any})) // this.memoryDataSource.dispatch(addUserTyping({data: {...e} as any})) // const value = await this.localDataSource.addUserTyping(e); const id = e.roomId + '@' + e.userName if(!this.typingCallback[id]) { this.typingCallback[id] = new Subject() this.typingCallback[id].pipe( switchMap(() => timer(2000)), ).subscribe(() => { // console.log('111111==============') // this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any})) this.localDataSource.removeUserTyping(e) }) } else { this.typingCallback[id].next() } } }) } async syncMessage() { const connection = this.SignalRService.getConnectionState() connection.pipe( skip(1) // Skip the first value ).subscribe((value)=> { if(value) { // on reconnect this.ChatServiceService.chatSync(); } }); connection.subscribe((value) => { if(value) { // on connect // this.ChatServiceService.sendLocalMessages() } }) // on page reload sync if(!(!SessionStore.user.Inactivity || !SessionStore.exist)) { this.ChatServiceService.start(); } } }