mirror of
https://code.equilibrium.co.ao/ITO/doneit-web.git
synced 2026-04-19 04:57:52 +00:00
add readAt functionality
This commit is contained in:
@@ -19,7 +19,6 @@ export class MessageAsyncService {
|
||||
private messageLocalDataSourceService: MessageLocalDataSourceService
|
||||
) {
|
||||
|
||||
|
||||
this.messageLiveSignalRDataSourceService.getMessage().pipe(
|
||||
filter((message: any) => {
|
||||
return !message?.requestId?.startsWith(InstanceId) && message?.requestId
|
||||
@@ -40,7 +39,6 @@ export class MessageAsyncService {
|
||||
|
||||
this.incomingMessage(incomingMessage)
|
||||
|
||||
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
@@ -21,17 +21,15 @@ export class UserTypingAsyncService {
|
||||
private signalR: SignalRService,
|
||||
) {
|
||||
|
||||
this.signalR.getTyping().subscribe(async (e:any) => {
|
||||
if(e?.chatRoomId) {
|
||||
|
||||
console.log('e', e)
|
||||
this.signalR.getTyping().subscribe(async (e) => {
|
||||
if(e?.roomId) {
|
||||
|
||||
this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any}))
|
||||
this.memoryDataSource.dispatch(addUserTyping({data: {...e} as any}))
|
||||
//
|
||||
const value = await this.localDataSource.addUserTyping(e);
|
||||
|
||||
const id = e.chatRoomId + '@' + e.userName
|
||||
const id = e.roomId + '@' + e.userName
|
||||
if(!this.typingCallback[id]) {
|
||||
this.typingCallback[id] = new Subject()
|
||||
this.typingCallback[id].pipe(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { Dexie, EntityTable, liveQuery } from 'Dexie';
|
||||
import { err, ok } from 'neverthrow';
|
||||
import { err, ok, Result } from 'neverthrow';
|
||||
import { z } from 'zod';
|
||||
import { from, Observable, Subject } from 'rxjs';
|
||||
import { filter, switchMap } from 'rxjs/operators';
|
||||
@@ -73,6 +73,21 @@ export class MessageLocalDataSourceService {
|
||||
}
|
||||
|
||||
|
||||
async getLastMessageByRoomId(roomId: string): Promise<Result<undefined|TableMessage, any>> {
|
||||
try {
|
||||
const lastMessage = await messageDataSource.message
|
||||
.where('roomId')
|
||||
.equals(roomId)
|
||||
.reverse()
|
||||
.sortBy('id');
|
||||
|
||||
return ok(lastMessage[0]); // Get the last message
|
||||
} catch (error) {
|
||||
return err(error);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
async sendMessage(data: MessageInputDTO) {
|
||||
|
||||
(data as TableMessage).sending = true
|
||||
|
||||
+2
-2
@@ -11,8 +11,8 @@ export class UserTypingLiveDataSourceService {
|
||||
private SignalRLiveDataSourceService: SignalRService
|
||||
) { }
|
||||
|
||||
sendTyping(ChatRoomId) {
|
||||
return this.SignalRLiveDataSourceService.sendTyping({ChatRoomId, UserName:SessionStore.user.FullName})
|
||||
sendTyping(roomId) {
|
||||
return this.SignalRLiveDataSourceService.sendTyping({roomId, UserName:SessionStore.user.FullName})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -26,4 +26,5 @@ export const MessageOutPutDTOSchema = z.object({
|
||||
data: DataSchema
|
||||
});
|
||||
|
||||
export type MessageOutPutDataDTO = z.infer<typeof DataSchema>
|
||||
export type MessageOutPutDTO = z.infer<typeof MessageOutPutDTOSchema>
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
import { z } from "zod"
|
||||
|
||||
export const UserTypingDTOSchema = z.object({
|
||||
requestId: z.string(),
|
||||
roomId: z.string(),
|
||||
userId: z.string(),
|
||||
userName: z.string()
|
||||
})
|
||||
export type UserTypingDTO = z.infer<typeof UserTypingDTOSchema>
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import { SessionStore } from 'src/app/store/session.service';
|
||||
import { SignalRService } from '../../infra/socket/signal-r.service';
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { filter } from 'rxjs/operators';
|
||||
import { err, ok } from 'neverthrow';
|
||||
|
||||
export const InstanceId = uuidv4();
|
||||
|
||||
@@ -50,9 +51,6 @@ export class MessageRepositoryService {
|
||||
|
||||
if(localActionResult.isOk()) {
|
||||
|
||||
(await this.sendTyping(data.roomId)).map((e) => {
|
||||
console.log('map', e)
|
||||
})
|
||||
const sendMessageResult = await this.messageLiveSignalRDataSourceService.sendMessage(data)
|
||||
|
||||
|
||||
@@ -79,6 +77,18 @@ export class MessageRepositoryService {
|
||||
}
|
||||
}
|
||||
|
||||
async sendReadAt({roomId}) {
|
||||
const result = await this.messageLocalDataSourceService.getLastMessageByRoomId(roomId)
|
||||
if(result.isOk()) {
|
||||
if(result.value) {
|
||||
|
||||
return await this.messageLiveSignalRDataSourceService.sendReadAt({roomId, memberId: SessionStore.user.UserId, chatMessageId: result.value.messageId})
|
||||
}
|
||||
return ok(true)
|
||||
}
|
||||
return err(false)
|
||||
}
|
||||
|
||||
async listAllMessagesByRoomId(id: string) {
|
||||
const result = await this.messageRemoteDataSourceService.getMessagesFromRoom(id)
|
||||
|
||||
@@ -105,7 +115,7 @@ export class MessageRepositoryService {
|
||||
return this.messageLocalDataSourceService.subscribeToNewMessage(roomId)
|
||||
}
|
||||
|
||||
sendTyping(ChatRoomId) {
|
||||
return this.messageLiveSignalRDataSourceService.sendTyping({ChatRoomId, UserName:SessionStore.user.FullName})
|
||||
sendTyping(roomId) {
|
||||
return this.messageLiveSignalRDataSourceService.sendTyping({roomId, UserName:SessionStore.user.FullName})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,17 +3,21 @@ import { BehaviorSubject } from 'rxjs';
|
||||
import { Platform } from '@ionic/angular';
|
||||
import { SignalRConnection } from './signalR';
|
||||
import { Plugins } from '@capacitor/core';
|
||||
import { z } from 'zod';
|
||||
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
|
||||
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
|
||||
|
||||
const { App } = Plugins;
|
||||
|
||||
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class SignalRService {
|
||||
private connection: SignalRConnection;
|
||||
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private typingSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
|
||||
private typingSubject: BehaviorSubject<UserTypingDTO> = new BehaviorSubject<UserTypingDTO>(null);
|
||||
private connectingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(null);
|
||||
|
||||
constructor(
|
||||
@@ -73,7 +77,7 @@ export class SignalRService {
|
||||
}
|
||||
|
||||
getTyping() {
|
||||
return this.typingSubject.asObservable()
|
||||
return this.typingSubject.asObservable().pipe()
|
||||
}
|
||||
|
||||
async sendMessage(data: Object) {
|
||||
@@ -84,7 +88,11 @@ export class SignalRService {
|
||||
this.establishConnection()
|
||||
}
|
||||
|
||||
async sendTyping({ChatRoomId, UserName}) {
|
||||
return await this.connection.typing({ ChatRoomId, UserName})
|
||||
async sendTyping({roomId, UserName}) {
|
||||
return await this.connection.typing({ roomId, UserName})
|
||||
}
|
||||
|
||||
async sendReadAt({ roomId, memberId, chatMessageId}) {
|
||||
return await this.connection.sendReadAt({ roomId, memberId, chatMessageId})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,11 +4,15 @@ import { ok, Result, err } from 'neverthrow';
|
||||
import { SessionStore } from 'src/app/store/session.service';
|
||||
import { filter, first } from 'rxjs/operators';
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
|
||||
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
|
||||
|
||||
export class SignalRConnection {
|
||||
|
||||
private hubConnection: signalR.HubConnection;
|
||||
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private typingSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
|
||||
private typingSubject: BehaviorSubject<UserTypingDTO> = new BehaviorSubject<UserTypingDTO>(null);
|
||||
private readAtSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
private disconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
private reconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
@@ -23,6 +27,7 @@ export class SignalRConnection {
|
||||
establishConnection(): Promise<Result<signalR.HubConnection, false>> {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
console.log('try to connect');
|
||||
const hubConnection = new signalR.HubConnectionBuilder()
|
||||
.withUrl(this.url)
|
||||
.build();
|
||||
@@ -35,8 +40,8 @@ export class SignalRConnection {
|
||||
console.log('Connection started');
|
||||
this.connectionStateSubject.next(true);
|
||||
this.hubConnection = hubConnection
|
||||
this.addMessageListener()
|
||||
this.join()
|
||||
this.addMessageListener()
|
||||
resolve(ok(hubConnection))
|
||||
})
|
||||
.catch(error => {
|
||||
@@ -69,8 +74,8 @@ export class SignalRConnection {
|
||||
|
||||
public join() {
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
console.log('join=============')
|
||||
|
||||
console.log('join=================')
|
||||
this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName);
|
||||
//this.hubConnection.invoke("Join", 105, "UserFirefox");
|
||||
} else {
|
||||
@@ -83,7 +88,7 @@ export class SignalRConnection {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
console.log('sendMessage')
|
||||
console.log('sendMessage', data)
|
||||
this.hubConnection.invoke("SendMessage", data)
|
||||
|
||||
this.messageSubject.pipe(
|
||||
@@ -103,14 +108,14 @@ export class SignalRConnection {
|
||||
})
|
||||
}
|
||||
|
||||
public async typing(data: Object & { ChatRoomId, UserName}):Promise<Result<any, any>> {
|
||||
public async typing(data: Object & { roomId, UserName}):Promise<Result<any, any>> {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
const requestId = uuidv4()
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
|
||||
try {
|
||||
this.hubConnection.invoke("Typing", {UserName: data.UserName, ChatRoomId: data.ChatRoomId, requestId} as any)
|
||||
this.hubConnection.invoke("Typing", {userName: data.UserName, roomId: data.roomId, requestId} as any)
|
||||
|
||||
} catch (error) {}
|
||||
|
||||
@@ -132,23 +137,60 @@ export class SignalRConnection {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
public async sendReadAt(data: Object & { roomId, memberId, chatMessageId}):Promise<Result<any, any>> {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
const requestId = uuidv4()
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
|
||||
try {
|
||||
this.hubConnection.invoke("ReadAt", { roomId: data.roomId, memberId: data.memberId, requestId } as any)
|
||||
|
||||
} catch (error) {}
|
||||
|
||||
this.readAtSubject.pipe(
|
||||
filter((message: any) => {
|
||||
return requestId == message?.requestId
|
||||
}),
|
||||
first()
|
||||
).subscribe(value => {
|
||||
resolve(ok(value));
|
||||
});
|
||||
|
||||
} else {
|
||||
this.sendLaterSubject.next({method: 'SendMessage', args: data})
|
||||
return reject(err(false))
|
||||
}
|
||||
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
private addMessageListener(): void {
|
||||
this.hubConnection.on('ReceiveMessage', (message) => {
|
||||
console.log('listening')
|
||||
this.hubConnection.on('ReceiveMessage', (message: MessageOutPutDataDTO) => {
|
||||
console.log('ReceiveMessage', message)
|
||||
this.messageSubject.next(message);
|
||||
});
|
||||
|
||||
this.hubConnection.on('Typing', (_message) => {
|
||||
console.log('_message', _message)
|
||||
this.typingSubject.next(_message);
|
||||
this.hubConnection.on('Typing', (_typing: UserTypingDTO) => {
|
||||
console.log('Typing', _typing)
|
||||
this.typingSubject.next(_typing);
|
||||
});
|
||||
|
||||
this.hubConnection.on('ReadAt', (_message) => {
|
||||
console.log('ReadAt', _message)
|
||||
this.readAtSubject.next(_message);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public getMessages(): Observable<string> {
|
||||
public getMessages() {
|
||||
return this.messageSubject.asObservable()
|
||||
}
|
||||
|
||||
public getTyping(): Observable<string> {
|
||||
public getTyping() {
|
||||
return this.typingSubject.asObservable()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user