fix chat messages

This commit is contained in:
Peter Maquiran
2024-06-13 12:11:17 +01:00
parent 856a0cf137
commit 962ab6aa86
16 changed files with 334 additions and 104 deletions
@@ -11,8 +11,6 @@ export function roomMemberListDetermineChanges(____serverRooms: RoomByIdMemberIt
}
})
console.log({PServerRooms, localRooms})
const serverRoomMap = new Map(PServerRooms.map(room => [room.$roomIdUserId, room]));
const localRoomMap = new Map(localRooms.map(room => [room.$roomIdUserId, room]));
@@ -1,87 +1,26 @@
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
interface WebSocketMessage {
type: string;
payload: any;
}
interface WebSocketError {
type: string;
error: any;
}
import { err, ok } from 'neverthrow';
import { WebSocketMessage, WebSocketService } from '../../infra/socket/socket';
@Injectable({
providedIn: 'root'
})
export class MessageLiveDataSourceService {
private socket$: WebSocketSubject<WebSocketMessage>;
private messageSubject$: Subject<WebSocketMessage>;
private connectionStatus$: BehaviorSubject<boolean>;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
constructor() {
this.messageSubject$ = new Subject<WebSocketMessage>();
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
constructor(public socket: WebSocketService) {}
async sendMessage(data: WebSocketMessage) {
try {
const result = await this.socket.sendMessage(data).toPromise()
console.log({result})
return ok(result)
} catch (e) {
return err(e)
}
}
public connect(url: string) {
this.socket$ = webSocket<WebSocketMessage>(url);
this.socket$.pipe(
tap({
error: () => {
this.connectionStatus$.next(false);
}
}),
retryWhen(errors => errors.pipe(
tap(() => {
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
throw new Error('Max reconnect attempts reached');
}
}),
delay(1000)
))
).subscribe(
(message) => {
this.messageSubject$.next(message);
this.connectionStatus$.next(true);
this.reconnectAttempts = 0;
},
(err) => {
console.error('WebSocket connection error:', err);
},
() => {
console.log('WebSocket connection closed');
this.connectionStatus$.next(false);
}
);
}
public sendMessage(message: WebSocketMessage): Observable<void> {
return new Observable<void>(observer => {
this.socket$.next(message);
observer.next();
observer.complete();
}).pipe(
catchError(err => {
console.error('Send message error:', err);
return new Observable<never>(observer => {
observer.error({ type: 'SEND_ERROR', error: err });
});
})
);
}
public get messages$(): Observable<WebSocketMessage> {
return this.messageSubject$.asObservable();
}
public get connectionStatus(): Observable<boolean> {
return this.connectionStatus$.asObservable();
}
}
@@ -2,6 +2,8 @@ import { Injectable } from '@angular/core';
import { Dexie, EntityTable, liveQuery } from 'Dexie';
import { err, ok } from 'neverthrow';
import { z } from 'zod';
import { from, Observable, Subject } from 'rxjs';
import { filter, switchMap } from 'rxjs/operators';
import { MessageInputDTO } from '../../dto/message/messageInputDtO';
@@ -41,9 +43,17 @@ messageDataSource.version(1).stores({
})
export class MessageLocalDataSourceService {
private baseUrl = 'https://gdapi-dev.dyndns.info/stage/api/v2/Chat'; // Your base URL
messageSubject = new Subject();
constructor() {}
constructor() {
messageDataSource.message.hook('creating', (primKey, obj, trans) => {
// const newMessage = await trans.table('message').get(primKey);
console.log(primKey, obj)
this.messageSubject.next(obj);
// return newMessage
})
}
async sendMessage(data: MessageInputDTO) {
@@ -95,8 +105,10 @@ export class MessageLocalDataSourceService {
}
}
getItemsLive(roomId: string) {
return liveQuery(() => messageDataSource.message.where('roomId').equals(roomId).toArray() )
getItemsLive(roomId: string) {
return liveQuery(() =>
messageDataSource.message.where('roomId').equals(roomId).sortBy('id')
)
}
@@ -112,11 +124,18 @@ export class MessageLocalDataSourceService {
}
} catch (e) {
console.log('error')
return err('DB error')
}
}
subscribeToNewMessage(roomId: string): Observable<TableMessage> {
return this.messageSubject.pipe(
filter((message: TableMessage) =>
message.roomId === roomId
)
)
}
}
@@ -0,0 +1,27 @@
import { Injectable } from '@angular/core';
import { WebSocketMessage, WebSocketService } from '../../infra/socket/socket';
import { err, ok } from 'neverthrow';
@Injectable({
providedIn: 'root'
})
export class RoomLiveDataSourceService {
constructor(private socket: WebSocketService) {}
async getRoomById(data: WebSocketMessage) {
try {
const result = await this.socket.sendMessage(data).toPromise()
console.log({result})
return ok(result)
} catch (e) {
return err(e)
}
}
}
@@ -0,0 +1,130 @@
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
import { SessionStore } from 'src/app/store/session.service';
import { v4 as uuidv4 } from 'uuid'
export interface WebSocketMessage {
type: string;
payload: any;
requestId?: string;
}
interface WebSocketError {
type: string;
error: any;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private socket$: WebSocketSubject<WebSocketMessage>;
private messageSubject$: Subject<WebSocketMessage>;
private connectionStatus$: BehaviorSubject<boolean>;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
callback: {[key: string]: Function} = {}
constructor() {
this.messageSubject$ = new Subject<WebSocketMessage>();
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
this.connect('http://5.180.182.151:8080/')
this.messages$.subscribe(({payload, requestId}) => {
if(this.callback[requestId]) {
this.callback[requestId]({payload, requestId})
delete this.callback[requestId]
}
console.log({payload, requestId})
})
}
public connect(url: string) {
this.socket$ = webSocket<WebSocketMessage>(url);
this.socket$.pipe(
tap({
error: () => {
this.connectionStatus$.next(false);
}
}),
retryWhen(errors => errors.pipe(
tap(() => {
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
throw new Error('Max reconnect attempts reached');
}
}),
delay(1000)
))
).subscribe(
(message) => {
this.messageSubject$.next(message);
if (!this.connectionStatus$.getValue()) {
this.connectionStatus$.next(true);
this.reconnectAttempts = 0;
// Send a message when the connection is established
this.sendMessage(SessionStore.user.UserId as any).subscribe();
}
},
(err) => {
console.error('WebSocket connection error:', err);
},
() => {
console.log('WebSocket connection closed');
this.connectionStatus$.next(false);
}
);
}
public sendMessage(message: WebSocketMessage): Observable<any> {
return new Observable<void>(observer => {
if(typeof message == 'object') {
message.requestId = uuidv4()
this.socket$.next(message);
this.callback[message.requestId] = ({payload, requestId})=> {
observer.next(payload as any);
observer.complete();
}
} else {
this.socket$.next(message);
observer.next({} as any);
observer.complete();
}
}).pipe(
catchError(err => {
console.error('Send message error:', err);
return new Observable<never>(observer => {
observer.error({ type: 'SEND_ERROR', error: err });
});
})
);
}
public get messages$(): Observable<WebSocketMessage> {
return this.messageSubject$.asObservable();
}
public get connectionStatus(): Observable<boolean> {
return this.connectionStatus$.asObservable();
}
}
@@ -14,7 +14,36 @@ export class MessageRepositoryService {
private messageRemoteDataSourceService: MessageRemoteDataSourceService,
private messageLiveDataSourceService: MessageLiveDataSourceService,
private messageLocalDataSourceService: MessageLocalDataSourceService
) {}
) {
this.messageLiveDataSourceService.socket.messages$.subscribe(({payload, requestId}) => {
if(payload.sender == null) {
delete payload.sender
}
if(payload) {
let clone: TableMessage = {
...payload,
messageId: payload.id,
}
delete clone.id
try {
if(clone.sender.wxUserId != SessionStore.user.UserId) {
console.log(this.messageLocalDataSourceService.createMessage(clone))
}
} catch (e) {
console.log('error', {payload})
}
}
})
}
async sendMessage(data: MessageInputDTO) {
@@ -26,6 +55,11 @@ export class MessageRepositoryService {
}
const localActionResult = await this.messageLocalDataSourceService.sendMessage(data)
this.messageLiveDataSourceService.sendMessage({
type: 'sendMessage',
payload: data
})
if(localActionResult.isOk()) {
const sendMessageResult = await this.messageRemoteDataSourceService.sendMessage(data)
@@ -70,4 +104,9 @@ export class MessageRepositoryService {
getItemsLive (roomId: string) {
return this.messageLocalDataSourceService.getItemsLive(roomId)
}
subscribeToNewMessages(roomId: any) {
return this.messageLocalDataSourceService.subscribeToNewMessage(roomId)
}
}
@@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { RoomRemoteDataSourceService } from '../data-source/room/room-remote-data-source.service'
import { RoomInputDTO } from '../dto/room/roomInputDTO';;
import { RoomInputDTO } from '../dto/room/roomInputDTO';
import { addRoom, RoomRemoteDataSourceState } from '../data-source/room/room-memory-data-source';
import { Store } from '@ngrx/store';
import { AddMemberToRoomInputDTO } from '../dto/room/addMemberToRoomInputDto';
@@ -12,6 +12,8 @@ import { roomMemberListDetermineChanges } from '../async/rooms/roomMembersChange
import { captureAndReraiseAsync } from 'src/app/services/decorators/captureAndReraiseAsync';
import { RoomUpdateInputDTO } from '../dto/room/roomUpdateInputDTO';
import { SessionStore } from 'src/app/store/session.service';
import { RoomLiveDataSourceService } from '../data-source/room/room-live-data-source.service';
import { isHttpResponse } from 'src/app/services/http.service';
@Injectable({
providedIn: 'root'
@@ -21,7 +23,8 @@ export class RoomRepositoryService {
constructor(
private roomRemoteDataSourceService: RoomRemoteDataSourceService,
private roomMemoryDataSourceService: Store<RoomRemoteDataSourceState>,
private roomLocalDataSourceService: RoomLocalDataSourceService
private roomLocalDataSourceService: RoomLocalDataSourceService,
private roomLiveDataSourceService: RoomLiveDataSourceService
) { }
@captureAndReraiseAsync('RoomRepositoryService/list')
@@ -89,13 +92,25 @@ export class RoomRepositoryService {
const { membersToInsert, membersToUpdate, membersToDelete } = roomMemberListDetermineChanges(result.value.data.members, localList, id)
for (const user of membersToInsert) {
this.roomLocalDataSourceService.addMember({...user, roomId:id})
await this.roomLocalDataSourceService.addMember({...user, roomId:id})
}
for(const user of membersToDelete) {
this.roomLocalDataSourceService.removeMemberFromRoom(user.$roomIdUserId)
await this.roomLocalDataSourceService.removeMemberFromRoom(user.$roomIdUserId)
}
const __localListRoom = await this.roomLocalDataSourceService.getRoomList()
// this.roomLiveDataSourceService.getRoomById({
// type:'memberList',
// payload: __localListRoom
// })
} else if (isHttpResponse(result.error) ) {
if(result.error.status == 404) {
await this.roomLocalDataSourceService.deleteRoomById(id)
}
// this.httpErrorHandle.httpStatusHandle(result.error)
}
return result
@@ -109,6 +124,11 @@ export class RoomRepositoryService {
return await this.roomLocalDataSourceService.deleteRoomById(id)
} else if (isHttpResponse(result.error)) {
if(result.error.status == 404) {
await this.roomLocalDataSourceService.deleteRoomById(id)
}
// this.httpErrorHandle.httpStatusHandle(result.error)
}
return result
@@ -156,7 +176,12 @@ export class RoomRepositoryService {
const result = await this.roomRemoteDataSourceService.removeMemberFromRoom(data)
if(result.isOk()) {
return this.roomLocalDataSourceService.leaveRoom(data.id)
this.roomLocalDataSourceService.leaveRoom(data.id)
} else if (isHttpResponse(result.error)) {
if(result.error.status == 404) {
await this.roomLocalDataSourceService.deleteRoomById(data.id)
}
// this.httpErrorHandle.httpStatusHandle(result.error)
}
return result
@@ -178,4 +203,5 @@ export class RoomRepositoryService {
getRoomMemberById(roomId: any) {
return this.roomLocalDataSourceService.getRoomMemberById(roomId)
}
}
+8 -1
View File
@@ -1,4 +1,4 @@
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { HttpClient, HttpErrorResponse, HttpResponse } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { ok, err, Result } from 'neverthrow';
@@ -53,3 +53,10 @@ export class HttpService {
}
}
}
export function isHttpResponse(data: any): data is HttpResponse<any> {
return typeof data.status == 'number';
}
@@ -15,13 +15,19 @@
</button>
</div>
</div>
<div class="d-flex" >
<div class="d-flex header-bottom" >
<ion-list *ngIf="roomMembers$ | async as memberList" >
<div class="header-bottom-icon">
<ion-icon *ngIf="ThemeService.currentTheme == 'default' " src="assets/icon/icons-user.svg"></ion-icon>
<ion-icon *ngIf="ThemeService.currentTheme == 'gov' " src="assets/icon/theme/gov/icons-user.svg"></ion-icon>
</div>
<div *ngFor="let user of memberList" > {{ user.user.wxFullName }}, </div>
<ion-list class="header-bottom-contacts" *ngIf="roomMembers$ | async as memberList">
<ng-container *ngFor="let user of memberList; let i = index">
{{ user.user.wxFullName }}<ng-container *ngIf="i < memberList.length - 1">, </ng-container>
</ng-container>
</ion-list>
</div>
</div>
<ion-refresher name="refresher" slot="fixed" (ionRefresh)="doRefresh($event)">
@@ -39,7 +45,7 @@
<div
*ngFor="let message of roomMessage$ | async" class="messages-list-item-wrapper"
[ngClass]="{'my-message': message.sender.wxUserId === sessionStore.user.UserId, 'other-message': message.sender.wxUserId !== sessionStore.user.UserId}">
{{ message.message }}
{{ message.message }} == {{ message.id }}
</div>
</div>
@@ -47,18 +47,15 @@
.header-bottom-icon {
width: rem(30);
font-size: rem(25);
float: left;
padding: 2px;
}
.header-bottom-contacts {
width: 275px;
font-size: rem(15);
color: #797979;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
float: left;
padding: 5px;
margin: 1px;
}
@@ -171,7 +168,7 @@ ion-content {
.other-message {
margin: 10px 75px 10px 20px;
// background: #ebebeb;
background: #ebebeb;
// float: left;
// Styles for incoming messages from other users
justify-content: flex-start;
@@ -41,6 +41,7 @@ import { TableMemberList } from 'src/app/services/Repositorys/chat/data-source/r
import { TableMessage } from 'src/app/services/Repositorys/chat/data-source/message/message-local-data-source.service';
import { ChatPopoverPage } from '../../popover/chat-popover/chat-popover.page';
import { Observable as DexieObservable } from 'Dexie';
import { Subscription } from 'rxjs';
const IMAGE_DIR = 'stored-images';
@Component({
@@ -119,6 +120,8 @@ export class MessagesPage implements OnInit, OnChanges, AfterViewInit, OnDestroy
roomMessage$: DexieObservable<TableMessage[]>
roomMembers$: DexieObservable<TableMemberList[] | undefined>
newMessagesStream!: Subscription
constructor(
public popoverController: PopoverController,
private modalController: ModalController,
@@ -151,6 +154,20 @@ export class MessagesPage implements OnInit, OnChanges, AfterViewInit, OnDestroy
this.roomMembers$ = this.roomRepositoryService.getRoomMemberByIdLive(this.roomId) as any
this.roomRepositoryService.getRoomById(this.roomId)
this.messageRepositoryService.listAllMessagesByRoomId(this.roomId)
this.newMessagesStream?.unsubscribe()
this.newMessagesStream = this.messageRepositoryService.subscribeToNewMessages(this.roomId).subscribe((e) => {
setTimeout(() => {
this.scrollToBottomClicked()
}, 10)
setTimeout(() => {
this.scrollToBottomClicked()
}, 200)
})
}
sendTyping() {}
@@ -424,6 +441,7 @@ export class MessagesPage implements OnInit, OnChanges, AfterViewInit, OnDestroy
}
this.messageRepositoryService.sendMessage(data)
this.textField = ''
}
@@ -10,6 +10,7 @@ import { HttpErrorResponse, HttpResponse } from '@angular/common/http';
import { SessionStore } from 'src/app/store/session.service';
import { GroupContactsPage } from '../../chat/group-messages/group-contacts/group-contacts.page';
import { ZodError } from 'zod';
import { isHttpResponse } from 'src/app/services/http.service';
@Component({
@@ -96,7 +97,10 @@ export class ChatPopoverPage implements OnInit {
if(result.isOk()) {
this.close('leave');
// this.openGroupMessage.emit(this.roomId);
} else if (result.error instanceof HttpResponse) {
} else if (isHttpResponse(result.error)) {
if(result.error.status == 404) {
this.close('leave');
}
// this.httpErrorHandle.httpStatusHandle(result.error)
} else if (result.error instanceof ZodError) {
this.toastService._badRequest("Pedimos desculpa mas não foi possível executar a acção. Por favor, contacte o apoio técnico.")
@@ -118,8 +122,10 @@ export class ChatPopoverPage implements OnInit {
if(result.isOk()) {
this.close('delete');
// this.openGroupMessage.emit(this.roomId);
} else if (result.error instanceof HttpResponse) {
// this.httpErrorHandle.httpStatusHandle(result.error)
} else if (isHttpResponse(result.error)) {
if(result.error.status == 404) {
this.close('close');
}
} else if (result.error instanceof ZodError) {
this.toastService._badRequest("Pedimos desculpa mas não foi possível executar a acção. Por favor, contacte o apoio técnico.")
console.log(result.error.errors)