send direct message

This commit is contained in:
Peter Maquiran
2024-08-20 16:34:47 +01:00
parent 4fb24f7875
commit 59fc19879f
41 changed files with 912 additions and 308 deletions
+1 -3
View File
@@ -43,7 +43,7 @@ export class ChatModule {
this.typingCallback[id].pipe(
switchMap(() => timer(2000)),
).subscribe(() => {
console.log('111111==============')
// console.log('111111==============')
// this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any}))
this.localDataSource.removeUserTyping(e)
})
@@ -51,8 +51,6 @@ export class ChatModule {
this.typingCallback[id].next()
}
} else {
console.log('e--', e)
}
})
}
@@ -1,6 +1,7 @@
import { MessageTable } from "src/app/module/chat/infra/database/dexie/schema/message";
import { RoomListItemOutPutDTO, RoomListOutPutDTO } from "../../../dto/room/roomListOutputDTO";
export function messageListDetermineChanges(serverList: any[], localList: any[]) {
export function messageListDetermineChanges(serverList: MessageTable[], localList: MessageTable[]) {
localList = localList.filter(e => e.id)
// Convert lists to dictionaries for easier comparison
@@ -34,7 +34,11 @@ export const MessageOutPutDataDTOSchema = z.object({
reaction: z.string(),
sender: z.object({}),
}).array(),
info: z.array(z.object({})),
info: z.array(z.object({
memberId: z.number(),
readAt: z.string().nullable(),
deliverAt: z.string().nullable()
})),
attachments: z.array(z.object({
fileType: z.nativeEnum(MessageAttachmentFileType),
source: z.nativeEnum(MessageAttachmentSource),
@@ -8,38 +8,41 @@ import { chatDatabase } from '../../infra/database/dexie/service';
import { ok } from 'neverthrow';
import { err, Result } from 'neverthrow';
import { MemberListUPdateStatusInputDTO } from '../../domain/use-case/socket/member-list-update-status-use-case.service';
import { MemberTable } from '../../infra/database/dexie/schema/members';
import { MemberTable, MemberTableSchema } from '../../infra/database/dexie/schema/members';
import { from } from 'rxjs';
@Injectable({
providedIn: 'root'
})
export class MemberListLocalRepository extends DexieRepository<RoomTable> {
export class MemberListLocalRepository extends DexieRepository<MemberTable> {
constructor() {
super(chatDatabase.room, RoomTableSchema)
super(chatDatabase.members, MemberTableSchema)
}
// messageDataSource.message.hook('creating', (primKey, obj, trans) => {
// // const newMessage = await trans.table('message').get(primKey);
// this.messageSubject.next(obj);
// // return newMessage
// })
async directMember({roomId, currentUserId}) {
try {
let a = await chatDatabase.members.where('roomId')
.equals(roomId)
.and(message => message.wxUserId !== currentUserId)
.first()
return ok(a)
} catch (e) {
return err(e)
}
}
async addMember(data: MemberTable) {
try {
data.$roomIdUserId = data.roomId + data.wxUserId
const result = await chatDatabase.members.add(data)
return ok(result)
} catch (e) {
return err(false)
}
data.$roomIdUserId = data.roomId + data.wxUserId
return this.insert(data)
}
async updateMemberRole(data: MemberTable) {
try {
const result = await chatDatabase.members.where({
wxUserId:data.wxUserId,
wxUserId: data.wxUserId,
roomId: data.roomId,
}).modify(data);
@@ -100,19 +103,11 @@ export class MemberListLocalRepository extends DexieRepository<RoomTable> {
return liveQuery(() => chatDatabase.members.get($roomIdUserId)) as any;
}
getItemsLive(): Observable<RoomListOutPutDTO[]> {
return liveQuery(() => chatDatabase.room.toArray()) as any;
}
getRoomByIdLive(id: any): Observable<RoomListItemOutPutDTO | undefined> {
return liveQuery(() => chatDatabase.room.get(id)) as any;
}
async getRoomMemberById(roomId: any) {
return await chatDatabase.members.where({roomId}).toArray()
}
getRoomMemberByIdLive(roomId: any) {
return liveQuery(() => chatDatabase.members.where({roomId}).toArray())
return from (liveQuery(() => chatDatabase.members.where({roomId}).toArray()))
}
getRoomMemberNoneAdminByIdLive(roomId: any) {
@@ -19,6 +19,22 @@ interface msgObj {
requestId: string;
}
interface sendDeliverAt {
memberId: number,
messageId:string,
roomId: string,
requestId: string
}
export interface sendReadAt {
memberId: number,
messageId:string,
roomId: string,
requestId: string
}
@Injectable({
providedIn: 'root'
})
@@ -48,6 +64,24 @@ export class MessageSocketRepositoryService {
return result;
}
async sendDeliverAt(data: sendDeliverAt) {
const result = await this.socket.sendData<any>({
method: 'DeliverAt',
data: data as any,
})
return result;
}
async sendReadAt(data: sendReadAt) {
const result = await this.socket.sendData<any>({
method: 'ReadAt',
data: data as any,
})
return result;
}
listenToMessages() {
return this.socket.getMessage()
}
@@ -62,7 +96,7 @@ export class MessageSocketRepositoryService {
return this.socket.getMessageUpdate()
}
reactToMessageSocket(data) {
this.socket.sendData({
method: 'ReactMessage',
@@ -94,6 +128,6 @@ export class MessageSocketRepositoryService {
}
}
@@ -7,6 +7,7 @@ import { ValidateSchema } from 'src/app/services/decorators/validate-schema.deco
import { chatDatabase } from '../../infra/database/dexie/service';
import { RoomTable, RoomTableSchema } from '../../infra/database/dexie/schema/room';
import { DexieRepository } from 'src/app/infra/repository/dexie/dexie-repository.service';
import { from } from 'rxjs';
@Injectable({
providedIn: 'root'
@@ -81,7 +82,7 @@ export class RoomLocalRepository extends DexieRepository<RoomTable> {
getItemsLive(){
return liveQuery(() => chatDatabase.room.toArray());
return from (liveQuery(() => chatDatabase.room.toArray()));
}
getRoomByIdLive(id: any) {
@@ -25,6 +25,7 @@ import { SendLocalMessagesUseCaseService } from './use-case/messages-send-offlin
import { RemoveMemberUseCaseService } from './use-case/member/-use-case.service'
import { AddMemberUseCaseService } from './use-case/member-add-use-case.service'
import { UpdateRoomByIdUseCaseService } from './use-case/room-update-by-id-use-case.service'
import { MessageMarkAsReadUseCaseService } from './use-case/message/message-mark-as-read-use-case.service'
import { GetMessageAttachmentLocallyUseCaseService } from 'src/app/module/chat/domain/use-case/message-get-attachment-localy-use-case.service';
import { GetRoomListUseCaseService } from 'src/app/module/chat/domain/use-case/room-get-list-use-case.service';
import { filter } from 'rxjs/operators';
@@ -36,6 +37,7 @@ import { UserRemoveListInputDTO } from '../data/dto/room/userRemoveListInputDTO'
import { AddMemberToRoomInputDTO } from '../data/dto/room/addMemberToRoomInputDto';
import { RoomUpdateInputDTO } from '../data/dto/room/roomUpdateInputDTO';
import { RoomType } from "src/app/module/chat/domain/entity/group";
import { sendReadAt } from "src/app/module/chat/data/repository/message/message-live-signalr-data-source.service";
export const InstanceId = uuidv4();
@@ -73,7 +75,8 @@ export class ChatServiceService {
private UpdateRoomByIdUseCaseService: UpdateRoomByIdUseCaseService,
private RemoveMemberUseCaseService: RemoveMemberUseCaseService,
private MessageReadAtByIdUseCaseService: MessageReadAtByIdUseCaseService,
private SendLocalMessagesUseCaseService: SendLocalMessagesUseCaseService
private SendLocalMessagesUseCaseService: SendLocalMessagesUseCaseService,
private MessageMarkAsReadUseCaseService: MessageMarkAsReadUseCaseService
) {
this.messageLiveSignalRDataSourceService.getMessageDelete()
.pipe()
@@ -102,7 +105,6 @@ export class ChatServiceService {
})
).subscribe(async (message) => {
if(message?.id) {
console.log('create message')
this.SocketMessageCreateUseCaseService.execute(message)
}
})
@@ -115,7 +117,6 @@ export class ChatServiceService {
}
})
).subscribe(async (message) => {
console.log('123', message)
this.MemberListUpdateStatusUseCaseService.execute(message.data as any)
})
@@ -213,6 +214,10 @@ export class ChatServiceService {
}
sendReadAt(sendReadAt: sendReadAt) {
return this.MessageMarkAsReadUseCaseService.execute(sendReadAt)
}
listenToIncomingMessage(roomId:string) {
return this.ListenMessageByRoomIdNewUseCase.execute({roomId})
}
@@ -14,12 +14,18 @@ export const MessageEntitySchema = z.object({
oneShot: z.boolean(),
sentAt: z.string().optional(),
requireUnlock: z.boolean(),
editedAt: z.string().nullable().optional(),
sender: z.object({
wxUserId: z.number(),
wxFullName: z.string(),
wxeMail: z.string(),
userPhoto: z.string(),
}),
info: z.array(z.object({
memberId: z.number(),
readAt: z.string().nullable(),
deliverAt: z.string().nullable()
})).optional(),
sending: z.boolean().optional(),
attachments: z.array(z.object({
fileType: z.nativeEnum(MessageAttachmentFileType),
@@ -47,6 +53,11 @@ export class MessageEntity implements Message {
oneShot: boolean = false
sentAt: string
requireUnlock: boolean = false
info: {
memberId?: number
readAt?: string,
deliverAt?: string
}[] = []
sender: {
wxUserId: number,
wxFullName: string,
@@ -0,0 +1,9 @@
import { Injectable } from '@angular/core';
@Injectable({
providedIn: 'root'
})
export class RoomExpirationService {
constructor() { }
}
@@ -14,8 +14,12 @@ export class ListenSendMessageUseCase {
execute({roomId}: {roomId: string}) {
console.log('reciee message')
return this.MessageSocketRepositoryService.listenToMessages().pipe(
filter((message) => message?.requestId?.startsWith(InstanceId) && message?.roomId == roomId),
filter((message) => {
console.log(message, roomId)
return message?.requestId?.startsWith(InstanceId) && message?.roomId == roomId
}),
map(message => message)
)
@@ -15,8 +15,9 @@ import { MessageTable } from '../../infra/database/dexie/schema/message';
import { MessageMapper } from '../mapper/messageMapper';
import { SignalRService } from '../../infra/socket/signal-r.service';
import { RoomType } from "src/app/module/chat/domain/entity/group";
import { TracingType, XTracerAsync } from 'src/app/services/monitoring/opentelemetry/tracer';
import { MemberListLocalRepository } from 'src/app/module/chat/data/repository/member-list-local-repository.service'
import { SessionStore } from 'src/app/store/session.service';
const MessageInputUseCaseSchema = z.object({
memberId: z.number(),
roomId: z.string(),
@@ -39,11 +40,13 @@ export class MessageCreateUseCaseService {
private AttachmentLocalRepositoryService: AttachmentLocalDataSource,
private messageLocalDataSourceService: MessageLocalDataSourceService,
private messageLiveSignalRDataSourceService: SignalRService,
private messageSocketRepositoryService: MessageSocketRepositoryService
private messageSocketRepositoryService: MessageSocketRepositoryService,
private MemberListLocalRepository: MemberListLocalRepository
) { }
async execute(message: MessageEntity, messageEnum: RoomType) {
@XTracerAsync({name:'MessageCreateUseCaseService', module:'chat', bugPrint: true})
async execute(message: MessageEntity, messageEnum: RoomType, tracing?: TracingType) {
const validation = zodSafeValidation<MessageEntity>(MessageEntitySchema, message)
@@ -93,15 +96,32 @@ export class MessageCreateUseCaseService {
//====================
message.sending = true
const DTO = MessageMapper.fromDomain(message, message.requestId)
let sendMessageResult: Result<MessageOutPutDataDTO, any>
if(messageEnum == RoomType.Group) {
const DTO = MessageMapper.fromDomain(message, message.requestId)
sendMessageResult = await this.messageLiveSignalRDataSourceService.sendMessage<MessageOutPutDataDTO>(DTO)
} else {
sendMessageResult = await this.messageSocketRepositoryService.sendDirectMessage(DTO)
if(message.receiverId) {
const DTO = MessageMapper.fromDomain(message, message.requestId)
sendMessageResult = await this.messageSocketRepositoryService.sendDirectMessage(DTO)
} else {
const getRoomMembers = await this.MemberListLocalRepository.directMember({
roomId:message.roomId,
currentUserId: SessionStore.user.UserId
})
if(getRoomMembers.isOk()) {
message.receiverId = getRoomMembers.value.wxUserId
const DTO = MessageMapper.fromDomain(message, message.requestId)
sendMessageResult = await this.messageSocketRepositoryService.sendDirectMessage(DTO)
} else {
console.log('not found direct users', getRoomMembers.error)
}
}
}
// return this sendMessageResult
if(sendMessageResult.isOk()) {
@@ -127,7 +147,7 @@ export class MessageCreateUseCaseService {
await this.messageLocalDataSourceService.update(message.$id, {sending: false, $id: message.$id})
return err('no connection')
}
}
} else {
@@ -136,6 +156,11 @@ export class MessageCreateUseCaseService {
zodErrorList: validation.error.errors,
data: message.attachments
})
} else {
Logger.error('failed to send message, validation failed', {
zodErrorList: validation.error.errors,
data: message
})
}
}
@@ -3,8 +3,7 @@ import { z } from 'zod';
import { SafeValidateSchema, ValidateSchema } from 'src/app/services/decorators/validate-schema.decorator';
import { MessageRemoteDataSourceService } from '../../data/repository/message/message-remote-data-source.service';
import { MessageSocketRepositoryService } from '../../data/repository/message/message-live-signalr-data-source.service';
import { XTracerAsync, TracingType } from 'src/app/services/monitoring/opentelemetry/tracer';
export const MessageDeleteInputDTOSchema = z.object({
requestId: z.string().optional(),
roomId: z.string(),
@@ -22,8 +21,11 @@ export class MessageDeleteLiveUseCaseService {
public repository: MessageSocketRepositoryService
) { }
@SafeValidateSchema(MessageDeleteInputDTOSchema, 'MessageDeleteUseCaseService')
async execute(data: MessageDeleteInputDTO) {
@XTracerAsync({name:'MessageDeleteLiveUseCaseService', module:'chat', bugPrint: true})
async execute(data: MessageDeleteInputDTO, tracing?: TracingType) {
tracing.log('MessageDeleteLiveUseCaseService payload', {
data: data
})
return this.repository.sendMessageDelete(data)
}
}
@@ -0,0 +1,18 @@
import { Injectable } from '@angular/core';
import { MessageSocketRepositoryService, sendReadAt } from '../../../data/repository/message/message-live-signalr-data-source.service';
import { XTracerAsync, TracingType } from 'src/app/services/monitoring/opentelemetry/tracer';
@Injectable({
providedIn: 'root'
})
export class MessageMarkAsReadUseCaseService {
constructor(
private MessageSocketRepositoryService: MessageSocketRepositoryService,
) { }
@XTracerAsync({name:'MessageMarkAsReadUseCaseService', module:'chat', bugPrint: true})
async execute(sendReadAt: sendReadAt, tracing?: TracingType) {
return this.MessageSocketRepositoryService.sendReadAt(sendReadAt)
}
}
@@ -4,7 +4,7 @@ import { RoomRemoteDataSourceService } from '../../data/repository/room-remote-r
import { RoomLocalRepository } from '../../data/repository/room-local-repository.service';
import { captureAndReraiseAsync } from 'src/app/services/decorators/captureAndReraiseAsync';
import { isHttpResponse } from 'src/app/services/http.service';
import { CronJobService } from 'src/app/utils/task-scheduler'
@Injectable({
providedIn: 'root'
})
@@ -14,6 +14,7 @@ export class GetRoomListUseCaseService {
private roomRemoteDataSourceService: RoomRemoteDataSourceService,
// private roomMemoryDataSourceService: Store<RoomRemoteDataSourceState>,
private roomLocalDataSourceService: RoomLocalRepository,
private CronJobService: CronJobService
) { }
@captureAndReraiseAsync('RoomRepositoryService/list')
@@ -28,6 +29,12 @@ export class GetRoomListUseCaseService {
for( const roomData of roomsToInsert) {
this.roomLocalDataSourceService.createRoom(roomData.chatRoom)
if(roomData.chatRoom.expirationDate) {
console.log('room expiration date schedule')
this.CronJobService.createCronJob('remove expired room', new Date(roomData.chatRoom.expirationDate), this.execute)
}
}
for( const roomData of roomsToUpdate) {
@@ -13,7 +13,7 @@ export class SocketMessageCreateUseCaseService {
private messageLocalDataSourceService: MessageLocalDataSourceService,
) { }
@XTracerAsync({name:'Socket-Message-Create-UseCase', bugPrint: true})
@XTracerAsync({name:'Socket-Message-Create-UseCase', module:'chat', bugPrint: true})
async execute(input: any, tracing?: TracingType) {
ParamsValidation(MessageOutPutDataDTOSchema, input, tracing)
@@ -15,7 +15,7 @@ export class SocketMessageUpdateUseCaseService {
) { }
@XTracerAsync({name:'Socket-Message-Update-UseCase', bugPrint: true})
@XTracerAsync({name:'Socket-Message-Update-UseCase', bugPrint: true, module:'chat',})
async execute(data: MessageOutPutDataDTO, tracing?: TracingType) {
ParamsValidation(MessageOutPutDataDTOSchema, data, tracing)
@@ -30,11 +30,13 @@ export class SocketMessageUpdateUseCaseService {
if(result.isOk()) {
tracing?.addEvent("Message found")
return await this.messageLocalDataSourceService.update(result.value.$id, incomingMessage)
const updateResult = await this.messageLocalDataSourceService.update(result.value.$id, incomingMessage)
tracing.setAttribute('outcome', 'success')
return updateResult
} else {
tracing?.addEvent("Message not found")
}
tracing.setAttribute('outcome', 'success')
}
}
@@ -3,8 +3,11 @@ import { MessageLocalDataSourceService } from '../../data/repository/message/mes
import { messageListDetermineChanges } from '../../data/async/list/rooms/messageListChangedetector';
import { MessageTable } from '../../infra/database/dexie/schema/message';
import { MessageRemoteDataSourceService } from '../../data/repository/message/message-remote-data-source.service';
import { MessageSocketRepositoryService } from '../../data/repository/message/message-live-signalr-data-source.service';
import { ok } from 'neverthrow';
import { RoomLocalRepository } from '../../data/repository/room-local-repository.service';
import { SessionStore } from 'src/app/store/session.service';
import { Logger } from 'src/app/services/logger/main/service';
@Injectable({
providedIn: 'root'
@@ -14,6 +17,7 @@ export class SyncAllRoomMessagesService {
constructor(
private messageLocalDataSourceService: MessageLocalDataSourceService,
private messageRemoteDataSourceService: MessageRemoteDataSourceService,
private MessageSocketRepositoryService: MessageSocketRepositoryService,
private roomLocalDataSourceService: RoomLocalRepository,
) { }
@@ -35,15 +39,38 @@ export class SyncAllRoomMessagesService {
const { addedItems, changedItems, deletedItems } = messageListDetermineChanges(result.value.data, localResult)
for(const message of changedItems) {
let clone: MessageTable = message
clone.roomId = room.id
this.messageLocalDataSourceService.findOrUpdate(clone)
const me = message.info.find(e => e.memberId == SessionStore.user.UserId)
if(!me) {
this.MessageSocketRepositoryService.sendDeliverAt({
memberId: SessionStore.user.UserId,
messageId: message.id,
roomId: message.roomId,
requestId: "string"
})
}
}
for(const message of addedItems) {
let clone: MessageTable = message
clone.roomId = room.id
const me = message.info.find(e => e.memberId == SessionStore.user.UserId)
if(!me) {
this.MessageSocketRepositoryService.sendDeliverAt({
memberId: SessionStore.user.UserId,
messageId: message.id,
roomId: message.roomId,
requestId: "string"
})
}
}
@@ -54,6 +81,8 @@ export class SyncAllRoomMessagesService {
this.messageLocalDataSourceService.deleteByMessageId(message.id)
}
} else {
Logger.error('failed to get room message '+room.id)
}
}
@@ -16,7 +16,7 @@ export const AttachmentTableSchema = z.object({
docId: z.string().optional(),
mimeType: z.string().optional(),
id: z.string().optional(),
description: z.string().optional(),
description: z.string().optional()
})
export type AttachmentTable = z.infer<typeof AttachmentTableSchema>
@@ -11,6 +11,7 @@ export const MessageTableSchema = z.object({
canEdit: z.boolean(),
oneShot: z.boolean(),
sentAt: z.string().optional(),
editedAt: z.string().nullable().optional(),
requireUnlock: z.boolean(),
sender: z.object({
wxUserId: z.number(),
@@ -26,7 +27,11 @@ export const MessageTableSchema = z.object({
reaction: z.string(),
sender: z.object({}),
}).array().optional(),
info: z.array(z.object({})).optional(),
info: z.array(z.object({
memberId: z.number(),
readAt: z.string().nullable(),
deliverAt: z.string().nullable()
})).optional(),
attachments: z.array(z.object({
fileType: z.nativeEnum(MessageAttachmentFileType),
source: z.nativeEnum(MessageAttachmentSource),
+1 -3
View File
@@ -157,7 +157,7 @@ export class SignalRConnection {
const requestId = uuidv4()
if(this.connectionStateSubject.value == true) {
console.log('send typing', data)
// console.log('send typing', data)
try {
this.hubConnection.invoke("Typing", data)
@@ -276,7 +276,6 @@ export class SignalRConnection {
});
this.hubConnection.on('TypingMessage', (_typing: UserTypingDTO) => {
console.log('Typing', _typing)
this.typingSubject.next(_typing);
this.sendDataSubject.next({
method: 'ReceiveMessage',
@@ -285,7 +284,6 @@ export class SignalRConnection {
});
this.hubConnection.on('AvailableUsers', (data: any) => {
console.log('AvailableUsers', data)
this.typingSubject.next(data);
this.sendDataSubject.next({
method: 'AvailableUsers',