import { Injectable } from '@angular/core'; import { Dexie, EntityTable, liveQuery } from 'Dexie'; import { err, ok, Result } from 'neverthrow'; import { z } from 'zod'; import { from, Observable, Subject } from 'rxjs'; import { filter } from 'rxjs/operators'; import { MessageInputDTO } from '../../dto/message/messageInputDtO'; import { MessageEntity } from '../../../domain/entity/message'; const tableSchema = z.object({ $id: z.number().optional(), id: z.string().optional(), roomId: z.string().uuid(), message: z.string(), messageType: z.number(), canEdit: z.boolean(), oneShot: z.boolean(), sentAt: z.string().optional(), requireUnlock: z.boolean(), sender: z.object({ wxUserId: z.number(), wxFullName: z.string(), wxeMail: z.string(), userPhoto: z.string(), }), sending: z.boolean().optional(), reaction: z.object({ id: z.string(), reactedAt: z.string(), reaction: z.string(), sender: z.object({}), }).array() }) export const IncomingMessageSchema = z.object({ messageId: z.string().optional(), roomId: z.string().uuid(), message: z.string(), messageType: z.number(), canEdit: z.boolean(), oneShot: z.boolean(), sentAt: z.string().optional(), requireUnlock: z.boolean(), sender: z.object({ wxUserId: z.number(), wxFullName: z.string(), wxeMail: z.string(), userPhoto: z.string(), }), sending: z.boolean().optional() }) export type TableMessage = z.infer // Database declaration (move this to its own module also) export const messageDataSource = new Dexie('chat-message') as Dexie & { message: EntityTable; }; messageDataSource.version(1).stores({ message: '++$id, id, roomId, message, messageType, canEdit, oneShot, requireUnlock, messageId, info' }); messageDataSource.message.mapToClass(MessageEntity); @Injectable({ providedIn: 'root' }) export class MessageLocalDataSourceService { messageSubject = new Subject(); constructor() { // messageDataSource.message.hook('creating', (primKey, obj, trans) => { // // const newMessage = await trans.table('message').get(primKey); // this.messageSubject.next(obj); // // return newMessage // }) } async setAllSenderToFalse() { try { await messageDataSource.transaction('rw', messageDataSource.message, async () => { // Perform the update operation within the transaction await messageDataSource.message.toCollection().modify({ sending: false }); }); console.log('All messages updated successfully.'); } catch (error) { console.error('Error updating messages:', error); } } async getLastMessageByRoomId(roomId: string): Promise> { try { console.log({roomId}) const lastMessage = await messageDataSource.message .where('roomId') .equals(roomId) .reverse() .sortBy('id'); return ok(lastMessage[0]); // Get the last message } catch (error) { return err(error); } } async deleteByMessageId(id: string): Promise> { try { console.log(id) const lastMessage = await messageDataSource.message .where('id') .equals(id).delete() return ok(lastMessage[0]); // Get the last message } catch (error) { return err(error); } } async sendMessage(data: MessageInputDTO) { (data as TableMessage).sending = true try { const result = await messageDataSource.message.add(data) this.messageSubject.next({roomId: data.roomId}); return ok(result as number) } catch (e) { return err(false) } } incomingSocketMessage() { } // @ValidateSchema(tableSchema) async createMessage(data: MessageInputDTO) { try { const result = await messageDataSource.message.add(data) this.messageSubject.next({roomId: data.roomId}); return ok(result) } catch (e) { return err(false) } } async messageExist({id}) { try { console.log({id}); const existingMessage = await messageDataSource.message .where('id') .equals(id) .first(); if (existingMessage) { return ok(existingMessage) } else { return err(false) } } catch (error) { return err(false); } } async update(data: TableMessage ) { try { console.log('update images 22222') const result = await messageDataSource.message.update(data.$id, data) return ok(result) } catch (e) { return err(false) } } // not used async updateByMessageId(data: TableMessage ) { try { const result = await messageDataSource.message.update(data.id as any, data) return ok(result) } catch (e) { return err(false) } } async findOrUpdate(data: TableMessage) { const findResult = await this.findMessageById(data.id) if(findResult.isOk()) { return this.update({...findResult.value, ...data}) } else { return this.createMessage(data) } } getItems(roomId: string) { return messageDataSource.message.where('roomId').equals(roomId).toArray() } getItemsLive(roomId: string) { return liveQuery(() => messageDataSource.message.where('roomId').equals(roomId).sortBy('$id')) } async findMessageById(id: string) { try { const a = await messageDataSource.message.where('id').equals(id).first() if(a) { return ok(a) } else { return err('not found') } } catch (e) { return err('DB error') } } subscribeToNewMessage(roomId: string): Observable { return this.messageSubject.pipe( filter((message: TableMessage) => message.roomId === roomId ) ) } async getOfflineMessages () { try { const allMessages = await messageDataSource.message .filter(msg => typeof msg.id !== 'string' && msg.sending == false) .toArray(); return allMessages as MessageEntity[]; } catch (error) { console.error('Error fetching messages:', error); } } }