import { Injectable } from '@angular/core'; import { liveQuery } from 'Dexie'; import { MessageEntity } from '../../../../../core/chat/entity/message'; import { DexieRepository } from 'src/app/infra/repository/dexie/dexie-repository.service'; import { Observable as DexieObservable, PromiseExtended } from 'Dexie'; import { DexieMessageTable, MessageTable, MessageTableSchema } from 'src/app/infra/database/dexie/instance/chat/schema/message'; import { chatDatabase } from 'src/app/infra/database/dexie/service'; import { IMessageLocalRepository } from 'src/app/core/chat/repository/message/message-local-repository'; import { BehaviorSubject, combineLatest, from, Observable } from 'rxjs'; import { filter, map } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid' @Injectable({ providedIn: 'root' }) export class MessageLocalDataSourceService extends DexieRepository implements IMessageLocalRepository { private creatingSubject : BehaviorSubject = new BehaviorSubject(null); private lastTimestamp = 0; constructor() { super(chatDatabase.message, MessageTableSchema, chatDatabase) this.setAllSenderToFalse(); this.onCreatingHook() } private onCreatingHook() { chatDatabase.message.hook('creating', (primaryKey, obj, transaction) => { let now = Date.now(); // If the current time is the same as the last, increment if (now <= this.lastTimestamp) { obj.$createAt = this.lastTimestamp + 1; this.lastTimestamp = this.lastTimestamp + 1; } else { this.lastTimestamp = now; obj.$createAt = now; } if(obj.id) { obj.$id = obj.id } else { obj.$id = 'Local-'+uuidv4() } this.creatingSubject.next(obj) }); } onCreateObservable() { return this.creatingSubject.asObservable() } async setAllSenderToFalse() { // this.createTransaction(async (table) => { // const result = await this.find({sending: true }) // if(result.isOk()) { // for(const message of result.value) { // await this.update(message.$id, { sending: false }) // } // } // }) try { await chatDatabase.transaction('rw', chatDatabase.message, async () => { // Perform the update operation within the transaction await chatDatabase.message.toCollection().modify({ sending: false }); }); // console.log('All messages updated successfully.'); } catch (error) { console.error('Error updating messages:', error); } } getItems(roomId: string): PromiseExtended { return chatDatabase.message.where('roomId').equals(roomId).sortBy('$createAt') as any } getItemsLive(roomId: string): DexieObservable { return liveQuery(() => chatDatabase.message.where('roomId').equals(roomId).sortBy('$createAt') as any) } async getOfflineMessages () { try { const allMessages = await chatDatabase.message .filter(msg => typeof msg.id !== 'string' && msg.sending == false) .toArray(); return allMessages as MessageEntity[]; } catch (error) { console.error('Error fetching messages:', error); } } getLastMessageForRooms(roomIds: string[]): Observable { const observables = roomIds.map(roomId => from (liveQuery(async() =>{ const messages = await chatDatabase.message .where('roomId') .equals(roomId) .reverse() .sortBy('timestamp') return messages[0] || null; // Return the first item (latest message) or null if no message })).pipe( map((message) => ({ roomId, message: message || null })) // Attach roomId to the result ) ); return combineLatest(observables); // Combine all observables into one array of results } }