mirror of
https://code.equilibrium.co.ao/ITO/doneit-web.git
synced 2026-04-19 04:57:52 +00:00
receive user typing
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { MessageAsyncService } from 'src/app/module/chat/data/async/socket/message-async.service'
|
||||
import { UserTypingAsyncService } from 'src/app/module/chat/data/async/socket/user-typing-async.service'
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
@@ -7,6 +8,7 @@ import { MessageAsyncService } from 'src/app/module/chat/data/async/socket/messa
|
||||
export class ChatServiceService {
|
||||
|
||||
constructor(
|
||||
private MessageAsyncService: MessageAsyncService
|
||||
private MessageAsyncService: MessageAsyncService,
|
||||
private UserTypingAsyncService: UserTypingAsyncService
|
||||
) { }
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import { MessageRemoteDataSourceService } from '../../data-source/message/messag
|
||||
import { SignalRService } from '../../../infra/socket/signal-r.service';
|
||||
import { filter } from 'rxjs/operators';
|
||||
import { InstanceId } from '../../repository/message-respository.service';
|
||||
import { SocketStreamReturn } from 'src/app/services/decorators/socket-validate-schema.decorator';
|
||||
import { SafeValidateSchema } from 'src/app/services/decorators/validate-schema.decorator';
|
||||
|
||||
@Injectable({
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { MessageLiveDataSourceService } from '../../data-source/message/message-live-data-source.service';
|
||||
import { RoomLiveDataSourceService } from '../../data-source/room/room-live-data-source.service';
|
||||
import { RoomRemoteDataSourceService } from '../../data-source/room/room-remote-data-source.service';
|
||||
import { roomDataSource, RoomLocalDataSourceService } from '../../data-source/room/rooom-local-data-source.service';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class RoomAsyncService {
|
||||
|
||||
constructor(
|
||||
private roomRemoteDataSourceService: RoomRemoteDataSourceService,
|
||||
// private roomMemoryDataSourceService: Store<RoomRemoteDataSourceState>,
|
||||
private roomLocalDataSourceService: RoomLocalDataSourceService,
|
||||
private roomLiveDataSourceService: RoomLiveDataSourceService,
|
||||
private messageLiveDataSourceService: MessageLiveDataSourceService,
|
||||
) {
|
||||
|
||||
|
||||
roomDataSource.typing.hook('creating', (primKey, obj, trans) => {
|
||||
setTimeout(() => {
|
||||
|
||||
}, 1000);
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
incomingTyping() {
|
||||
|
||||
}
|
||||
|
||||
async removeUserTyping() {
|
||||
const result = await this.roomLocalDataSourceService.removeUserTyping()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { UserTypingLiveDataSourceService } from '../../data-source/userTyping/user-typing-live-data-source.service';
|
||||
import { UserTypingLocalDataSourceService } from '../../data-source/userTyping/user-typing-local-data-source.service';
|
||||
import { SignalRService } from '../../../infra/socket/signal-r.service';
|
||||
import { interval, Subject, timer } from 'rxjs';
|
||||
import { switchMap } from 'rxjs/operators';
|
||||
import { addUserTyping, removeUserTyping, TypingState } from '../../data-source/userTyping/user-typing-memory-data-source.service';
|
||||
import { Store } from '@ngrx/store';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class UserTypingAsyncService {
|
||||
|
||||
typingCallback: {[key: string]: Subject<any> } = {}
|
||||
|
||||
constructor(
|
||||
private localDataSource: UserTypingLocalDataSourceService,
|
||||
private liveDataSource: UserTypingLiveDataSourceService,
|
||||
private memoryDataSource: Store<TypingState>,
|
||||
private signalR: SignalRService,
|
||||
) {
|
||||
|
||||
this.signalR.getTyping().subscribe(async (e:any) => {
|
||||
if(e?.chatRoomId) {
|
||||
|
||||
console.log('e', e)
|
||||
|
||||
this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any}))
|
||||
this.memoryDataSource.dispatch(addUserTyping({data: {...e} as any}))
|
||||
//
|
||||
const value = await this.localDataSource.addUserTyping(e);
|
||||
|
||||
const id = e.chatRoomId + '@' + e.userName
|
||||
if(!this.typingCallback[id]) {
|
||||
this.typingCallback[id] = new Subject()
|
||||
this.typingCallback[id].pipe(
|
||||
switchMap(() => timer(2000)),
|
||||
).subscribe(() => {
|
||||
console.log('111111==============')
|
||||
this.memoryDataSource.dispatch(removeUserTyping({data: {...e} as any}))
|
||||
this.localDataSource.removeUserTyping(e)
|
||||
})
|
||||
} else {
|
||||
this.typingCallback[id].next()
|
||||
}
|
||||
|
||||
} else {
|
||||
console.log('e--', e)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -40,18 +40,30 @@ const TableMemberListSchema = z.object({
|
||||
joinAt: z.string()
|
||||
})
|
||||
|
||||
|
||||
export const TypingSchema = z.object({
|
||||
id: z.string().optional(),
|
||||
userId: z.string(),
|
||||
roomId: z.string(),
|
||||
entryDate: z.string()
|
||||
})
|
||||
|
||||
|
||||
export type TableRoom = z.infer<typeof tableSchema>
|
||||
export type TableMemberList = z.infer<typeof TableMemberListSchema>
|
||||
export type TypingList = z.infer<typeof TypingSchema>
|
||||
|
||||
// Database declaration (move this to its own module also)
|
||||
export const roomDataSource = new Dexie('FriendDatabase') as Dexie & {
|
||||
room: EntityTable<TableRoom, 'id'>;
|
||||
memberList: EntityTable<TableMemberList, '$roomIdUserId'>;
|
||||
typing: EntityTable<TableMemberList, '$roomIdUserId'>;
|
||||
};
|
||||
|
||||
roomDataSource.version(1).stores({
|
||||
room: 'id, createdBy, roomName, roomType, expirationDate, lastMessage',
|
||||
memberList: '$roomIdUserId, id, user, joinAt, roomId',
|
||||
TypingList: '++id, userId, roomId, entryDate'
|
||||
});
|
||||
|
||||
@Injectable({
|
||||
@@ -63,6 +75,20 @@ export class RoomLocalDataSourceService {
|
||||
|
||||
constructor() {}
|
||||
|
||||
@ValidateSchema(TypingSchema)
|
||||
async addUserTyping(data: any) {
|
||||
try {
|
||||
const result = await roomDataSource.typing.add(data)
|
||||
return ok(result)
|
||||
} catch (e) {
|
||||
return err(false)
|
||||
}
|
||||
}
|
||||
|
||||
async removeUserTyping() {
|
||||
|
||||
}
|
||||
|
||||
@ValidateSchema(tableSchema)
|
||||
async createRoom(data: TableRoom) {
|
||||
try {
|
||||
|
||||
+18
@@ -0,0 +1,18 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { SignalRService } from '../../../infra/socket/signal-r.service';
|
||||
import { SessionStore } from 'src/app/store/session.service';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class UserTypingLiveDataSourceService {
|
||||
|
||||
constructor(
|
||||
private SignalRLiveDataSourceService: SignalRService
|
||||
) { }
|
||||
|
||||
sendTyping(ChatRoomId) {
|
||||
return this.SignalRLiveDataSourceService.sendTyping({ChatRoomId, UserName:SessionStore.user.FullName})
|
||||
}
|
||||
|
||||
}
|
||||
+63
@@ -0,0 +1,63 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { z } from 'zod';
|
||||
import { Dexie, EntityTable, liveQuery, Observable } from 'Dexie';
|
||||
import { err, ok } from 'neverthrow';
|
||||
|
||||
export const TypingSchema = z.object({
|
||||
id: z.string().optional(),
|
||||
userId: z.string().optional(),
|
||||
userName: z.string(),
|
||||
chatRoomId: z.string(),
|
||||
entryDate: z.string()
|
||||
})
|
||||
|
||||
export type TypingList = z.infer<typeof TypingSchema>
|
||||
|
||||
export type UserTypingList = z.infer<typeof TypingSchema>
|
||||
|
||||
|
||||
// Database declaration (move this to its own module also)
|
||||
export const TypingDataSource = new Dexie('UserTyping') as Dexie & {
|
||||
TypingList: EntityTable<TypingList, 'id'>;
|
||||
}
|
||||
|
||||
|
||||
TypingDataSource.version(1).stores({
|
||||
TypingList: 'id, userId, userName, chatRoomId, entryDate'
|
||||
});
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class UserTypingLocalDataSourceService {
|
||||
|
||||
constructor() { }
|
||||
|
||||
|
||||
async addUserTyping(data: TypingList) {
|
||||
data.id = data.chatRoomId + '@' + data.userName
|
||||
try {
|
||||
const result = await TypingDataSource.TypingList.add(data)
|
||||
return ok(result)
|
||||
} catch (e) {
|
||||
return err(false)
|
||||
}
|
||||
}
|
||||
|
||||
async removeUserTyping(data: TypingList) {
|
||||
|
||||
const id = data.chatRoomId + '@' + data.userName
|
||||
try {
|
||||
const result = await TypingDataSource.TypingList.delete(id)
|
||||
return ok(result)
|
||||
} catch (e) {
|
||||
return err(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
getUserTypingLive() {
|
||||
return liveQuery(() => TypingDataSource.TypingList.toArray());
|
||||
}
|
||||
|
||||
}
|
||||
+70
@@ -0,0 +1,70 @@
|
||||
import { createAction, createFeatureSelector, createReducer, createSelector, on, props } from '@ngrx/store';
|
||||
import { TypingList } from './user-typing-local-data-source.service';
|
||||
|
||||
|
||||
export const addUserTyping = createAction(
|
||||
'[Typing] Add User Typing',
|
||||
props<{ data: TypingList }>()
|
||||
);
|
||||
|
||||
export const removeUserTyping = createAction(
|
||||
'[Typing] Remove User Typing',
|
||||
props<{ data: TypingList }>()
|
||||
);
|
||||
|
||||
export const loadUserTyping = createAction('[Typing] Load User Typing');
|
||||
|
||||
export const loadUserTypingSuccess = createAction(
|
||||
'[Typing] Load User Typing Success',
|
||||
props<{ data: TypingList[] }>()
|
||||
);
|
||||
|
||||
export const loadUserTypingFailure = createAction(
|
||||
'[Typing] Load User Typing Failure',
|
||||
props<{ error: any }>()
|
||||
);
|
||||
|
||||
|
||||
export interface TypingState {
|
||||
typingList: TypingList[];
|
||||
error: any;
|
||||
}
|
||||
|
||||
export const initialState: TypingState = {
|
||||
typingList: [],
|
||||
error: null
|
||||
};
|
||||
|
||||
export const typingReducer = createReducer(
|
||||
initialState,
|
||||
on(loadUserTypingSuccess, (state, { data }) => ({
|
||||
...state,
|
||||
typingList: data
|
||||
})),
|
||||
on(loadUserTypingFailure, (state, { error }) => ({
|
||||
...state,
|
||||
error
|
||||
})),
|
||||
on(addUserTyping, (state, { data }) => ({
|
||||
...state,
|
||||
typingList: [...state.typingList, data]
|
||||
})),
|
||||
on(removeUserTyping, (state, { data }) => ({
|
||||
...state,
|
||||
typingList: state.typingList.filter(
|
||||
typing => typing.chatRoomId !== data.chatRoomId || typing.userName !== data.userName
|
||||
)
|
||||
}))
|
||||
);
|
||||
|
||||
export const selectCalendarState = createFeatureSelector<TypingState>('userTyping');
|
||||
|
||||
export const selectAllUserSource = createSelector(
|
||||
selectCalendarState,
|
||||
(state: TypingState) => state.typingList
|
||||
);
|
||||
|
||||
export const selectUserTypingList = () => createSelector(
|
||||
selectAllUserSource,
|
||||
(typingList) => typingList
|
||||
);
|
||||
@@ -50,6 +50,9 @@ export class MessageRepositoryService {
|
||||
|
||||
if(localActionResult.isOk()) {
|
||||
|
||||
(await this.sendTyping(data.roomId)).map((e) => {
|
||||
console.log('map', e)
|
||||
})
|
||||
const sendMessageResult = await this.messageLiveSignalRDataSourceService.sendMessage(data)
|
||||
|
||||
|
||||
@@ -101,4 +104,8 @@ export class MessageRepositoryService {
|
||||
subscribeToNewMessages(roomId: any) {
|
||||
return this.messageLocalDataSourceService.subscribeToNewMessage(roomId)
|
||||
}
|
||||
|
||||
sendTyping(ChatRoomId) {
|
||||
return this.messageLiveSignalRDataSourceService.sendTyping({ChatRoomId, UserName:SessionStore.user.FullName})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,26 +42,7 @@ export class RoomRepositoryService {
|
||||
private roomLocalDataSourceService: RoomLocalDataSourceService,
|
||||
private roomLiveDataSourceService: RoomLiveDataSourceService,
|
||||
private messageLiveDataSourceService: MessageLiveDataSourceService,
|
||||
) {
|
||||
|
||||
// this.messageLiveDataSourceService.socket.messages$.subscribe(({payload, requestId, type}) => {
|
||||
// if(payload.sender == null) {
|
||||
// delete payload.sender
|
||||
// }
|
||||
|
||||
// if(type == 'sendMessage') {
|
||||
// let clone: TableMessage = {
|
||||
// ...payload,
|
||||
// messageId: payload.id,
|
||||
// }
|
||||
|
||||
// this.roomLocalDataSourceService.updateRoom({lastMessage: clone})
|
||||
|
||||
// }
|
||||
|
||||
// })
|
||||
|
||||
}
|
||||
) {}
|
||||
|
||||
@captureAndReraiseAsync('RoomRepositoryService/list')
|
||||
async list() {
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
import { Injectable } from '@angular/core';
|
||||
import { TypingList, UserTypingLocalDataSourceService } from '../data-source/userTyping/user-typing-local-data-source.service';
|
||||
import { UserTypingLiveDataSourceService } from '../data-source/userTyping/user-typing-live-data-source.service';
|
||||
import { SessionStore } from 'src/app/store/session.service';
|
||||
|
||||
@Injectable({
|
||||
providedIn: 'root'
|
||||
})
|
||||
export class UserTypingServiceRepository {
|
||||
|
||||
constructor(
|
||||
private localDataSource: UserTypingLocalDataSourceService,
|
||||
private liveDataSource: UserTypingLiveDataSourceService
|
||||
) { }
|
||||
|
||||
async addUserTyping(ChatRoomId: any) {
|
||||
return await this.liveDataSource.sendTyping(ChatRoomId)
|
||||
}
|
||||
|
||||
async removeUserTyping(data: TypingList) {
|
||||
return await this.localDataSource.removeUserTyping(data)
|
||||
}
|
||||
|
||||
|
||||
getUserTypingLive() {
|
||||
return this.localDataSource.getUserTypingLive()
|
||||
}
|
||||
}
|
||||
@@ -13,10 +13,11 @@ const { App } = Plugins;
|
||||
export class SignalRService {
|
||||
private connection: SignalRConnection;
|
||||
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private typingSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private connectingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(null);
|
||||
|
||||
constructor(
|
||||
private platform: Platform,) {
|
||||
private platform: Platform) {
|
||||
// this.startConnection();
|
||||
// this.addMessageListener();
|
||||
|
||||
@@ -58,6 +59,9 @@ export class SignalRService {
|
||||
this.connection.getMessages().subscribe((data) => {
|
||||
this.messageSubject.next(data)
|
||||
})
|
||||
this.connection.getTyping().subscribe((data) => {
|
||||
this.typingSubject.next(data)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -68,6 +72,10 @@ export class SignalRService {
|
||||
|
||||
}
|
||||
|
||||
getTyping() {
|
||||
return this.typingSubject.asObservable()
|
||||
}
|
||||
|
||||
async sendMessage(data: Object) {
|
||||
return await this.connection.sendMessage(data as any)
|
||||
}
|
||||
@@ -75,4 +83,8 @@ export class SignalRService {
|
||||
newConnection() {
|
||||
this.establishConnection()
|
||||
}
|
||||
|
||||
async sendTyping({ChatRoomId, UserName}) {
|
||||
return await this.connection.typing({ ChatRoomId, UserName})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,11 +3,12 @@ import { BehaviorSubject, Observable } from 'rxjs';
|
||||
import { ok, Result, err } from 'neverthrow';
|
||||
import { SessionStore } from 'src/app/store/session.service';
|
||||
import { filter, first } from 'rxjs/operators';
|
||||
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
export class SignalRConnection {
|
||||
|
||||
private hubConnection: signalR.HubConnection;
|
||||
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private typingSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
|
||||
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
private disconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
private reconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
|
||||
@@ -27,7 +28,6 @@ export class SignalRConnection {
|
||||
.build();
|
||||
|
||||
this.hubConnection = hubConnection
|
||||
this.join()
|
||||
|
||||
hubConnection
|
||||
.start()
|
||||
@@ -70,7 +70,9 @@ export class SignalRConnection {
|
||||
public join() {
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
console.log('join=============')
|
||||
this.hubConnection.invoke("Join", 105, "UserFirefox");
|
||||
|
||||
this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName);
|
||||
//this.hubConnection.invoke("Join", 105, "UserFirefox");
|
||||
} else {
|
||||
this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]})
|
||||
}
|
||||
@@ -101,17 +103,55 @@ export class SignalRConnection {
|
||||
})
|
||||
}
|
||||
|
||||
public async typing(data: Object & { ChatRoomId, UserName}):Promise<Result<any, any>> {
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
const requestId = uuidv4()
|
||||
if(this.connectionStateSubject.value == true) {
|
||||
|
||||
try {
|
||||
this.hubConnection.invoke("Typing", {UserName: data.UserName, ChatRoomId: data.ChatRoomId, requestId} as any)
|
||||
|
||||
} catch (error) {}
|
||||
|
||||
this.typingSubject.pipe(
|
||||
filter((message: any) => {
|
||||
return requestId == message?.requestId
|
||||
}),
|
||||
first()
|
||||
).subscribe(value => {
|
||||
resolve(ok(value));
|
||||
});
|
||||
|
||||
} else {
|
||||
this.sendLaterSubject.next({method: 'SendMessage', args: data})
|
||||
return reject(err(false))
|
||||
}
|
||||
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
private addMessageListener(): void {
|
||||
this.hubConnection.on('ReceiveMessage', (message) => {
|
||||
console.log('ReceiveMessage', message)
|
||||
this.messageSubject.next(message);
|
||||
});
|
||||
|
||||
this.hubConnection.on('Typing', (_message) => {
|
||||
console.log('_message', _message)
|
||||
this.typingSubject.next(_message);
|
||||
});
|
||||
}
|
||||
|
||||
public getMessages(): Observable<string> {
|
||||
return this.messageSubject.asObservable()
|
||||
}
|
||||
|
||||
public getTyping(): Observable<string> {
|
||||
return this.typingSubject.asObservable()
|
||||
}
|
||||
|
||||
public getConnectionState(): Observable<boolean> {
|
||||
return this.connectionStateSubject.asObservable();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user