diff --git a/src/app/module/chat/data/repository/message/message-live-signalr-data-source.service.ts b/src/app/module/chat/data/repository/message/message-live-signalr-data-source.service.ts index 3a787b39a..8a9c0d1f8 100644 --- a/src/app/module/chat/data/repository/message/message-live-signalr-data-source.service.ts +++ b/src/app/module/chat/data/repository/message/message-live-signalr-data-source.service.ts @@ -3,14 +3,16 @@ import { v4 as uuidv4 } from 'uuid' import { MessageUpdateInput } from '../../../../../core/chat/usecase/message/message-update-by-id-use-case.service'; import { MessageReactionInput } from '../../../../../core/chat/usecase/message/message-reaction-by-id-use-case.service'; import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service'; -import { filter, map } from 'rxjs/operators'; +import { filter, map, take } from 'rxjs/operators'; import { SocketMessage } from 'src/app/infra/socket/signalR/signalR'; import { IMessageSocketRepository } from 'src/app/core/chat/repository/message/message-socket-repository'; import { MessageCreateOutPutDataDTO, MessageInputDTO } from '../../../../../core/chat/usecase/message/message-create-use-case.service'; import { MessageMarkAsReadInput } from '../../../../../core/chat/usecase/message/message-mark-as-read-use-case.service'; import { MessageOutPutDataDTO } from 'src/app/core/chat/repository/dto/messageOutputDTO'; import { MessageDeleteInputDTO } from '../../../../../core/chat/usecase/message/message-delete-by-id-live-use-case.service'; -import { BehaviorSubject } from 'rxjs'; +import { BehaviorSubject, Observable } from 'rxjs'; +import { ISignalROutput } from 'src/app/infra/socket/type'; +import { ok } from 'neverthrow'; interface sendDeliverAt { memberId: number, @@ -87,12 +89,20 @@ export class MessageSocketRepositoryService implements IMessageSocketRepository } async sendReadAt(data: MessageMarkAsReadInput) { - const result = await this.socket.sendData({ + this.socket.sendData({ method: 'ReadAt', data: data as any, }) - return result; + + return await this.socket.getData().pipe( + filter((message) => { return data.messageId == message?.data?.id && message.method == "UpdateMessage"}), + map((message) => { + // 🔧 manipulate here + return ok(message); + }), + take(1) + ).toPromise(); } async sendDelete(data: MessageDeleteInputDTO) {