add readAt and delete message

This commit is contained in:
Peter Maquiran
2024-07-31 17:23:44 +01:00
parent 128d92fe5e
commit 82e4acbe87
14 changed files with 287 additions and 67 deletions
@@ -6,6 +6,7 @@ import { SignalRService } from '../../../infra/socket/signal-r.service';
import { filter } from 'rxjs/operators';
import { InstanceId } from '../../repository/message-respository.service';
import { SafeValidateSchema } from 'src/app/services/decorators/validate-schema.decorator';
import { MessageOutPutDataDTO } from '../../dto/message/messageOutputDTO';
@Injectable({
providedIn: 'root'
@@ -20,27 +21,67 @@ export class MessageAsyncService {
) {
this.messageLiveSignalRDataSourceService.getMessage().pipe(
filter((message: any) => {
return !message?.requestId?.startsWith(InstanceId) && message?.requestId
filter((message) => {
return !message?.requestId?.startsWith(InstanceId)
})
).subscribe(async (message) => {
console.log('message async ', message)
if(message?.id) {
const id = message.id + ''
delete message.id;
console.log('message async ', message)
const incomingMessage = {
...message,
messageId: id,
sending: false,
roomId:message.chatRoomId
const id = message.id + ''
delete message.id;
const incomingMessage = {
...message,
messageId: id,
sending: false,
roomId:message.chatRoomId
}
this.incomingMessage(incomingMessage)
}
this.incomingMessage(incomingMessage)
})
this.messageLiveSignalRDataSourceService.getMessageUpdate().pipe(
filter((message) => {
return !message?.requestId?.startsWith(InstanceId)
})
).subscribe(async (message) => {
if(message?.id) {
console.log('message async ', message)
const id = message.id + ''
delete message.id;
const incomingMessage = {
...message,
messageId: id,
sending: false,
roomId:message.chatRoomId
}
this.incomingMessage(incomingMessage)
}
})
this.messageLiveSignalRDataSourceService.getMessageDelete()
.pipe()
.subscribe(async (message) => {
if(message.id) {
this.incomingDeleted(message)
}
})
}
@SafeValidateSchema(IncomingMessageSchema, 'socket/incomingMessage')
@@ -54,4 +95,15 @@ export class MessageAsyncService {
console.log(result.error)
}
}
async incomingDeleted(data: MessageOutPutDataDTO) {
const result = await this.messageLocalDataSourceService.deleteByMessageId(data.id)
if(result.isOk()) {
} else {
console.log(result.error)
}
}
}
@@ -88,6 +88,19 @@ export class MessageLocalDataSourceService {
}
}
async deleteByMessageId(messageId: string): Promise<Result<undefined|TableMessage, any>> {
try {
console.log(messageId)
const lastMessage = await messageDataSource.message
.where('messageId')
.equals(messageId).delete()
return ok(lastMessage[0]); // Get the last message
} catch (error) {
return err(error);
}
}
async sendMessage(data: MessageInputDTO) {
@@ -121,6 +134,25 @@ export class MessageLocalDataSourceService {
}
async messageExist({messageId}) {
try {
const existingMessage = await messageDataSource.message
.where('messageId')
.equals(messageId)
.first();
if (existingMessage) {
return ok(true)
} else {
return err(false)
}
} catch (error) {
return err(false);
}
}
async update(data: TableMessage ) {
try {
@@ -0,0 +1,10 @@
import { z } from "zod";
export const MessageDeleteInputDTOSchema = z.object({
requestId: z.string(),
roomId: z.string(),
messageId: z.string(),
senderId: z.number(),
});
export type MessageDeleteInputDTO = z.infer<typeof MessageDeleteInputDTOSchema>
@@ -16,7 +16,8 @@ const DataSchema = z.object({
deliverAt: z.string().datetime().nullable(),
canEdit: z.boolean(),
oneShot: z.boolean(),
requireUnlock: z.boolean()
requireUnlock: z.boolean(),
requestId: z.string()
});
@@ -8,6 +8,7 @@ import { SignalRService } from '../../infra/socket/signal-r.service';
import { v4 as uuidv4 } from 'uuid'
import { filter } from 'rxjs/operators';
import { err, ok } from 'neverthrow';
import { MessageDeleteInputDTO } from '../../domain/use-case/message-delete-live-use-case.service';
export const InstanceId = uuidv4();
@@ -81,12 +82,20 @@ export class MessageRepositoryService {
}
}
sendMessageDelete(data: MessageDeleteInputDTO) {
data['requestId'] = InstanceId +'@'+ uuidv4();
return this.messageLiveSignalRDataSourceService.sendMessageDelete(data)
}
async sendReadAt({roomId}) {
const result = await this.messageLocalDataSourceService.getLastMessageByRoomId(roomId)
if(result.isOk()) {
if(result.value) {
// return await this.messageLiveSignalRDataSourceService.sendReadAt({roomId, memberId: SessionStore.user.UserId, chatMessageId: result.value.messageId})
return await this.messageLiveSignalRDataSourceService.sendReadAt({roomId, memberId: SessionStore.user.UserId, chatMessageId: result.value.messageId})
}
return ok(true)
}
@@ -0,0 +1,23 @@
import { Injectable } from '@angular/core';
import { MessageDeleteLiveUseCaseService, MessageDeleteInputDTO } from 'src/app/module/chat/domain/use-case/message-delete-live-use-case.service'
import { SessionStore } from 'src/app/store/session.service';
@Injectable({
providedIn: 'root'
})
export class ChatServiceService {
constructor(
private MessageDeleteLiveUseCaseService: MessageDeleteLiveUseCaseService
) { }
messageDelete(data: {roomId, messageId}) {
const params = {
...data,
senderId: SessionStore.user.UserId,
}
return this.MessageDeleteLiveUseCaseService.execute(params)
}
}
@@ -0,0 +1,25 @@
import { Injectable } from '@angular/core';
import { z } from 'zod';
import { MessageRepositoryService } from '../../data/repository/message-respository.service';
export const MessageDeleteInputDTOSchema = z.object({
requestId: z.string(),
roomId: z.string(),
messageId: z.string(),
senderId: z.number(),
});
export type MessageDeleteInputDTO = z.infer<typeof MessageDeleteInputDTOSchema>
@Injectable({
providedIn: 'root'
})
export class MessageDeleteLiveUseCaseService {
constructor(
public repository: MessageRepositoryService
) { }
async execute(data: MessageDeleteInputDTO) {
return this.repository.sendMessageDelete(data)
}
}
@@ -6,6 +6,7 @@ import { Plugins } from '@capacitor/core';
import { z } from 'zod';
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
import { MessageDeleteInputDTO } from '../../data/dto/message/messageDeleteInputDTO';
const { App } = Plugins;
@@ -19,6 +20,8 @@ export class SignalRService {
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private typingSubject: BehaviorSubject<UserTypingDTO> = new BehaviorSubject<UserTypingDTO>(null);
private connectingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(null);
private messageDelete: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private messageUpdateSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
constructor(
private platform: Platform) {
@@ -48,7 +51,8 @@ export class SignalRService {
private async establishConnection() {
const connection = new SignalRConnection({url:'https://41e3-41-63-166-54.ngrok-free.app/api/v2/chathub'})
// const connection = new SignalRConnection({url:'https://41e3-41-63-166-54.ngrok-free.app/api/v2/chathub'})
const connection = new SignalRConnection({url:'https://gdapi-dev.dyndns.info/stage/api/v2/chathub'})
const attempConnection = await connection.establishConnection()
if(attempConnection.isOk()) {
@@ -66,6 +70,14 @@ export class SignalRService {
this.connection.getTyping().subscribe((data) => {
this.typingSubject.next(data)
})
this.connection.getMessageDelete().subscribe((data) => {
this.messageDelete.next(data)
})
this.connection.getMessageUpdateSubject().subscribe((data) => {
this.messageUpdateSubject.next(data)
})
}
}
@@ -80,6 +92,14 @@ export class SignalRService {
return this.typingSubject.asObservable().pipe()
}
getMessageDelete() {
return this.messageDelete.asObservable()
}
getMessageUpdate() {
return this.messageUpdateSubject.asObservable()
}
async sendMessage(data: Object) {
return await this.connection.sendMessage(data as any)
}
@@ -95,4 +115,10 @@ export class SignalRService {
async sendReadAt({ roomId, memberId, chatMessageId}) {
return await this.connection.sendReadAt({ roomId, memberId, chatMessageId})
}
async sendMessageDelete(data: MessageDeleteInputDTO) {
return await this.connection.deleteMessage(data)
}
}
+42 -35
View File
@@ -6,41 +6,14 @@ import { filter, first } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid'
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
var msgObj = {
roomId: "53bc6471-c28e-42d0-aa72-f4e52221f16f",
senderId:312,
message:"message enviada",
messageType:312,
canEdit:true,
oneShot:false,
requestId:"testing"
};
// var deletObj = {
// roomId: "53bc6471-c28e-42d0-aa72-f4e52221f16f",
// senderId:312,
// messageId:"message enviada",
// requestId:"testing"
// };
// var reactObj = {
// roomId: "53bc6471-c28e-42d0-aa72-f4e52221f16f",
// memberId:2,
// messageId:"e7074c10-4f92-458c-adb2-774ec2d42992",
// reaction:"reacted",
// requestId:"testingReaction"
// };
const typingObj = {
roomId: "53bc6471-c28e-42d0-aa72-f4e52221f16f",
userId: 312,
userName:"usertyping",
requestId:"testing"
};
import { MessageDeleteInputDTO } from '../../data/dto/message/messageDeleteInputDTO';
export class SignalRConnection {
private hubConnection: signalR.HubConnection;
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private messageDelete: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private messageUPdateSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private typingSubject: BehaviorSubject<UserTypingDTO> = new BehaviorSubject<UserTypingDTO>(null);
private readAtSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
@@ -57,7 +30,6 @@ export class SignalRConnection {
establishConnection(): Promise<Result<signalR.HubConnection, false>> {
return new Promise((resolve, reject) => {
console.log('try to connect');
const hubConnection = new signalR.HubConnectionBuilder()
.withUrl(this.url)
.build();
@@ -106,7 +78,7 @@ export class SignalRConnection {
if(this.connectionStateSubject.value == true) {
console.log('join=================')
this.hubConnection.invoke("Join", 312, SessionStore.user.FullName);
this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName);
//this.hubConnection.invoke("Join", 105, "UserFirefox");
} else {
this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]})
@@ -119,7 +91,7 @@ export class SignalRConnection {
if(this.connectionStateSubject.value == true) {
console.log('sendMessage', data)
this.hubConnection.invoke("SendMessage", msgObj)
this.hubConnection.invoke("SendMessage", data)
this.messageSubject.pipe(
filter((message: any) => data.requestId == message?.requestId),
@@ -138,6 +110,23 @@ export class SignalRConnection {
})
}
public async deleteMessage(data: MessageDeleteInputDTO) {
return new Promise((resolve, reject) => {
if(this.connectionStateSubject.value == true) {
console.log('delete message', data)
this.hubConnection.invoke("DeleteMessage", data)
this.messageSubject.pipe(
filter((message: any) => data.requestId == message?.requestId),
first()
).subscribe((value) => {
resolve(ok(value))
})
}
})
}
public async typing(data: Object & { roomId, UserName, userId }):Promise<Result<any, any>> {
return new Promise((resolve, reject) => {
@@ -146,7 +135,7 @@ export class SignalRConnection {
console.log('send typing', data)
try {
this.hubConnection.invoke("Typing", typingObj)
this.hubConnection.invoke("Typing", data)
} catch (error) {}
@@ -176,7 +165,7 @@ export class SignalRConnection {
if(this.connectionStateSubject.value == true) {
try {
this.hubConnection.invoke("ReadAt", { roomId: data.roomId, memberId: data.memberId, requestId } as any)
this.hubConnection.invoke("ReadAt", { roomId: data.roomId, memberId: data.memberId, requestId, messageId: data.chatMessageId} as any)
} catch (error) {}
@@ -215,6 +204,20 @@ export class SignalRConnection {
this.readAtSubject.next(_message);
});
this.hubConnection.on('DeleteMessage', (_message) => {
console.log('DeleteMessage', _message)
this.messageDelete.next(_message);
});
this.hubConnection.on('UpdateMessage', (_message) => {
console.log('UpdateMessage', _message)
this.messageUPdateSubject.next(_message);
})
}
public getMessageUpdateSubject() {
return this.messageUPdateSubject.asObservable()
}
public getMessages() {
@@ -237,6 +240,10 @@ export class SignalRConnection {
return this.sendLaterSubject.asObservable();
}
public getMessageDelete() {
return this.messageDelete.asObservable()
}
public closeConnection(): void {
this.reconnect = false
if (this.hubConnection) {