remove signal r to infra

This commit is contained in:
Peter Maquiran
2024-08-27 11:00:32 +01:00
parent c59abb7e04
commit 91a7e5f516
15 changed files with 24 additions and 28 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
import { NgModule } from '@angular/core';
import { SignalRService } from 'src/app/module/chat/infra/socket/signal-r.service'
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { ChatServiceService } from 'src/app/module/chat/domain/chat-service.service'
import { skip, switchMap } from 'rxjs/operators';
import { SessionStore } from 'src/app/store/session.service';
@@ -1,6 +1,5 @@
import { Injectable } from '@angular/core';
import { err, ok } from 'neverthrow';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SessionStore } from 'src/app/store/session.service';
import { MessageDeleteInputDTO } from '../../dto/message/messageDeleteInputDTO';
import { v4 as uuidv4 } from 'uuid'
@@ -9,6 +8,7 @@ import { MessageUpdateInput } from '../../../domain/use-case/message/message-upd
import { MessageOutPutDataDTO } from '../../dto/message/messageOutputDTO';
import { MessageInputDTO } from '../../dto/message/messageInputDtO';
import { MessageReactionInput } from '../../../domain/use-case/message/message-reaction-by-id-use-case.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
interface msgObj {
roomId: string;
@@ -5,7 +5,7 @@ import { ValidateSchema } from 'src/app/services/decorators/validate-schema.deco
import { APIReturn } from 'src/app/services/decorators/api-validate-schema.decorator';
import { MessageOutPutDataDTOSchema, MessageOutPutDTO, MessageOutPutDTOSchema } from '../../dto/message/messageOutputDTO';
import { DataSourceReturn } from 'src/app/services/Repositorys/type';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { MessageUpdateInput } from '../../../domain/use-case/message/message-update-by-id-use-case.service';
import { SessionStore } from 'src/app/store/session.service';
import { MessageDeleteInputDTO } from '../../dto/message/messageDeleteInputDTO';
@@ -15,7 +15,7 @@ import { RoomUpdateOutputDTO } from '../../dto/room/roomUpdateOutputDTO';
import { DataSourceReturn } from 'src/app/services/Repositorys/type';
import { SessionStore } from 'src/app/store/session.service';
import { MemberSetAdminDTO } from '../../../domain/use-case/member/member-admin-use-case.service';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { v4 as uuidv4 } from 'uuid'
@Injectable({
@@ -1,8 +1,8 @@
import { Injectable } from '@angular/core';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { filter, map } from 'rxjs/operators';
import { z } from 'zod';
import { SocketMessage } from '../../../infra/socket/signalR';
import { SocketMessage } from 'src/app/infra/socket/signalR/signalR';
import { RoomInputDTO } from '../../dto/room/roomInputDTO';
import { RoomOutPutDTO } from '../../dto/room/roomOutputDTO';
import { v4 as uuidv4 } from 'uuid'
@@ -1,8 +1,8 @@
import { Injectable } from '@angular/core';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { SessionStore } from 'src/app/store/session.service';
import { filter, map } from 'rxjs/operators';
import { SocketMessage } from '../../../infra/socket/signalR';
import { SocketMessage } from 'src/app/infra/socket/signalR/signalR';
import { UserTypingDTO } from '../../dto/typing/typingInputDTO';
@Injectable({
@@ -5,7 +5,7 @@ import { MessageReactionInput, MessageReactionUseCaseService } from 'src/app/mod
import { MessageUpdateInput, MessageUpdateUseCaseService } from 'src/app/module/chat/domain/use-case/message/message-update-by-id-use-case.service';
import { MemberAdminUseCaseService, MemberSetAdminDTO } from 'src/app/module/chat/domain/use-case/member/member-admin-use-case.service';
import { MessageCreateUseCaseService, MessageEnum } from 'src/app/module/chat/domain/use-case/message/message-create-use-case.service';
import { SignalRService } from '../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
import { SocketMessageDeleteUseCaseService } from 'src/app/module/chat/domain/use-case/socket/socket-message-delete-use-case.service';
import { SocketMessageUpdateUseCaseService } from 'src/app/module/chat/domain/use-case/socket/socket-message-update-use-case.service';
import { SocketMessageCreateUseCaseService } from 'src/app/module/chat/domain/use-case/socket/socket-message-create-use-case.service';
@@ -1,6 +1,6 @@
import { Injectable } from '@angular/core';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { filter } from 'rxjs/operators';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
@Injectable({
providedIn: 'root'
@@ -4,7 +4,8 @@ import { SessionStore } from 'src/app/store/session.service';
import { MessageLocalDataSourceService } from '../../../data/repository/message/message-local-data-source.service';
import { MessageSocketRepositoryService } from '../../../data/repository/message/message-live-signalr-data-source.service';
import { MessageRemoteDataSourceService } from '../../../data/repository/message/message-remote-data-source.service';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SignalRService } from 'src/app/infra/socket/signalR/signal-r.service';
@Injectable({
providedIn: 'root'
@@ -1,5 +1,4 @@
import { Injectable } from '@angular/core';
import { SignalRService } from '../../../infra/socket/signal-r.service';
import { SessionStore } from 'src/app/store/session.service';
import { UserTypingRemoteRepositoryService } from '../../../data/repository/typing/user-typing-live-data-source.service';
@@ -1,157 +0,0 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, Subject, timer } from 'rxjs';
import { Platform } from '@ionic/angular';
import { SignalRConnection, SocketMessage } from './signalR';
import { Plugins } from '@capacitor/core';
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
import { z } from 'zod';
import { filter, map, switchMap } from 'rxjs/operators';
import { Result } from 'neverthrow';
import { HubConnection } from '@microsoft/signalr';
const { App } = Plugins;
const SignalRInputSchema = z.object({
method: z.string(),
data: z.object({
requestId: z.string(),
}).catchall(z.unknown()), // Allows any additional properties with unknown values
})
export type ISignalRInput = z.infer<typeof SignalRInputSchema>;
@Injectable({
providedIn: 'root'
})
export class SignalRService {
private connection!: SignalRConnection;
private connectingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(null);
private sendDataSubject: BehaviorSubject<{method: string, data: any}> = new BehaviorSubject<{method: string, data: any}>(null);
private deadConnectionBackGround: Subject<any>;
constructor(private platform: Platform) {
this.deadConnectionBackGround = new Subject()
this.deadConnectionBackGround.pipe(
switchMap(() => timer(150000)),
).subscribe(() => {
this.newConnection()
})
try {
if (!this.platform.is('desktop')) {
App.addListener('appStateChange', ({ isActive }) => {
if (isActive) {
// The app is in the foreground.
// console.log('App is in the foreground');
this.deadConnectionBackGround.next()
}
});
} else {
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible') {
this.deadConnectionBackGround.next()
}
});
}
} catch(error) {}
// this.establishConnection()
}
async establishConnection(): Promise<Result<HubConnection, false>> {
// const connection = new SignalRConnection({url:'https://41e3-41-63-166-54.ngrok-free.app/api/v2/chathub'})
const connection = new SignalRConnection({url:'https://gdapi-dev.dyndns.info/stage/api/v2/chathub'})
const attempConnection = await connection.establishConnection()
if(attempConnection.isOk()) {
this.connection?.closeConnection()
this.connection = connection
this.connection.getData().subscribe((data) => {
this.sendDataSubject.next(data)
this.deadConnectionBackGround.next()
})
this.connection.getConnectionState().subscribe((data) => {
this.connectingSubject.next(data)
})
return attempConnection
} else {
return new Promise((resolve) => {
setTimeout(() => {
resolve(this.establishConnection())
}, 2000)
})
}
}
getMessage() {
return this.getData().pipe(
filter((e) : e is SocketMessage<MessageOutPutDataDTO>=> e?.method == 'ReceiveMessage'
),
map((e)=> e.data)
)
}
getTyping() {
return this.getData().pipe(
filter((e) : e is SocketMessage<UserTypingDTO>=> e?.method == 'TypingMessage'
),
map((e)=> e.data)
)
}
getMessageDelete() {
return this.getData().pipe(
filter((e) : e is SocketMessage<MessageOutPutDataDTO>=> e?.method == 'DeleteMessage'
),
map((e)=> e.data)
)
}
getMessageUpdate() {
return this.getData().pipe(
filter((e) : e is SocketMessage<MessageOutPutDataDTO>=> e?.method == 'UpdateMessage'
),
map((e)=> e.data)
)
}
sendData<T>(input: ISignalRInput) {
return this.connection.sendData<T>(input)
}
join() {
return this.connection.join()
}
getData<T>() {
return this.sendDataSubject.asObservable() as BehaviorSubject<{method: string, data: T}>
}
public getConnectionState(): Observable<boolean> {
return this.connectingSubject.asObservable();
}
newConnection() {
this.establishConnection()
}
}
-208
View File
@@ -1,208 +0,0 @@
import * as signalR from '@microsoft/signalr';
import { BehaviorSubject, Observable, race, timer } from 'rxjs';
import { ok, Result, err } from 'neverthrow';
import { SessionStore } from 'src/app/store/session.service';
import { filter, first } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid'
import { UserTypingDTO } from '../../data/dto/typing/typingInputDTO';
import { MessageOutPutDataDTO } from '../../data/dto/message/messageOutputDTO';
import { MessageDeleteInputDTO } from '../../data/dto/message/messageDeleteInputDTO';
import { MessageReactionInput } from '../../domain/use-case/message/message-reaction-by-id-use-case.service';
import { ISignalRInput } from './signal-r.service';
export interface SocketMessage<T> {
method: string,
data: T
}
export enum EnumSocketError {
catch = 1,
close
}
export class SignalRConnection {
private hubConnection: signalR.HubConnection;
private messageSubject: BehaviorSubject<MessageOutPutDataDTO> = new BehaviorSubject<MessageOutPutDataDTO>(null);
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private disconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private reconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private sendLaterSubject: BehaviorSubject<Object> = new BehaviorSubject<Object>(false);
private reconnect = true
private sendDataSubject: BehaviorSubject<{method: string, data: any}> = new BehaviorSubject<{method: string, data: any}>(null);
private pendingRequests: Map<string, { resolve: Function; reject: Function }> = new Map();
url: string
private hasConnectOnce = false
constructor({url}) {
this.url = url
}
establishConnection(): Promise<Result<signalR.HubConnection, false>> {
return new Promise((resolve, reject) => {
const hubConnection = new signalR.HubConnectionBuilder()
.withUrl(this.url)
.build();
this.hubConnection = hubConnection
hubConnection
.start()
.then(() => {
this.hubConnection = hubConnection
this.hasConnectOnce = true
console.log('Connection started');
this.connectionStateSubject.next(true);
this.join()
this.addMessageListener()
resolve(ok(hubConnection))
})
.catch(error => {
console.log('Error while starting connection: ' + error);
this.connectionStateSubject.next(false);
if(this.hasConnectOnce) {
setTimeout(()=> {
this.attempReconnect();
}, 2000)
}
resolve(err(false))
});
hubConnection.onclose(() => {
console.log('Connection closed');
this.connectionStateSubject.next(false);
this.disconnectSubject.next(true)
this.pendingRequests.forEach((_, requestId) => {
const data = this.pendingRequests.get(requestId);
if(data) {
const { reject } = data
reject(err({
type: EnumSocketError.close
}));
this.pendingRequests.delete(requestId);
}
});
if(this.reconnect) {
this.attempReconnect();
}
});
})
}
async attempReconnect() {
const attempConnection = await this.establishConnection()
if(attempConnection.isOk()) {
this.reconnectSubject.next(true)
}
}
public join() {
if(this.connectionStateSubject.value == true) {
this.hubConnection.invoke("Join", SessionStore.user.UserId, SessionStore.user.FullName);
//this.hubConnection.invoke("Join", 105, "UserFirefox");
} else {
this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]})
}
}
sendData<T>(input: ISignalRInput): Promise<Result<T, any>> {
return new Promise((resolve, reject) => {
if(this.connectionStateSubject.value == true) {
try {
this.pendingRequests.set(input.data.requestId, { resolve, reject: (data: any) => resolve(data) });
this.hubConnection.invoke(input.method, input.data)
this.sendDataSubject.pipe(
filter((message) => input.data.requestId == message?.data.requestId),
first()
).subscribe(value => {
resolve(ok(value.data as unknown as T))
// console.log('Received valid value:', value);
});
} catch(error) {
resolve(err({
type: EnumSocketError.catch
}))
}
} else {
this.sendLaterSubject.next({method: 'SendMessage', args: input})
return reject(err(false))
}
})
}
private addMessageListener(): void {
const methods = ['ReceiveMessage', 'TypingMessage', 'AvailableUsers',
'ReadAt', 'DeleteMessage', 'UpdateMessage', 'GroupAddedMembers',
'GroupDeletedMembers', 'UserAddGroup']
for(const method of methods) {
this.hubConnection.on(method, (message: any) => {
this.messageSubject.next(message);
this.sendDataSubject.next({
method: method,
data: message
})
});
}
}
public getConnectionState(): Observable<boolean> {
return this.connectionStateSubject.asObservable();
}
public getDisconnectTrigger(): Observable<boolean> {
return this.disconnectSubject.asObservable();
}
public getData() {
return this.sendDataSubject.asObservable()
}
public closeConnection(): void {
this.reconnect = false
if (this.hubConnection) {
this.hubConnection
.stop()
.then(() => {
console.log('Connection closed by user');
this.connectionStateSubject.next(false);
this.pendingRequests.forEach((_, requestId) => {
const data = this.pendingRequests.get(requestId);
if(data) {
const { reject } = data
reject(err({
type: EnumSocketError.close
}));
this.pendingRequests.delete(requestId);
}
});
})
.catch(err => console.log('Error while closing connection: ' + err));
}
}
}
@@ -1,50 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>SignalR Client</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/8.0.0/signalr.min.js"></script>
</head>
<body>
<h1>SignalR Client</h1>
<div id="messages"></div>
<input id="chatbox">
<script>
var msgObj = {
roomId: "882fcb86-4028-4242-bb47-fca0170dcc65",
senderId:105,
message:"message enviada",
messageType:1,
canEdit:true,
oneShot:false,
};
const connection = new signalR.HubConnectionBuilder()
.withAutomaticReconnect()
.withUrl("https://gdapi-dev.dyndns.info/stage/api/v2/chathub")
.build();
connection.start().then(function () {
connection.invoke("Join", 105, "UserFirefox");
document.getElementById("chatbox").addEventListener("keyup", function (event) {
if (event.key === "Enter") {
msgObj.Message = chatbox.value;
connection.invoke("SendMessage", msgObj);
event.target.value = "";
}
});
}).catch(function (err) {
return console.error(err.toString());
});
connection.on("ReceiveMessage", function (message) {
console.log(message);
const messages = document.getElementById("messages");
messages.innerHTML += `<p>${message.message}</p>`;
});
</script>
</body>
</html>
-129
View File
@@ -1,129 +0,0 @@
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
import { SessionStore } from 'src/app/store/session.service';
import { v4 as uuidv4 } from 'uuid'
export interface WebSocketMessage {
type: string;
payload: any;
requestId?: string;
}
interface WebSocketError {
type: string;
error: any;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private socket$: WebSocketSubject<WebSocketMessage>;
private messageSubject$: Subject<WebSocketMessage>;
private connectionStatus$: BehaviorSubject<boolean>;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
callback: {[key: string]: Function} = {}
constructor() {
this.messageSubject$ = new Subject<WebSocketMessage>();
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
// this.connect('https://5-180-182-151.cloud-xip.com:85/ws/')
// this.messages$.subscribe(({payload, requestId, type}) => {
// if(this.callback[requestId]) {
// this.callback[requestId]({payload, requestId, type})
// delete this.callback[requestId]
// }
// })
}
public connect(url: string) {
this.socket$ = webSocket<WebSocketMessage>(url);
this.socket$.pipe(
tap({
error: () => {
this.connectionStatus$.next(false);
}
}),
retryWhen(errors => errors.pipe(
tap(() => {
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
throw new Error('Max reconnect attempts reached');
}
}),
delay(1000)
))
).subscribe(
(message) => {
this.messageSubject$.next(message);
if (!this.connectionStatus$.getValue()) {
this.connectionStatus$.next(true);
this.reconnectAttempts = 0;
// Send a message when the connection is established
this.sendMessage(SessionStore.user.UserId as any).subscribe();
}
},
(err) => {
console.error('WebSocket connection error:', err);
},
() => {
console.log('WebSocket connection closed');
this.connectionStatus$.next(false);
}
);
}
public sendMessage(message: WebSocketMessage): Observable<any> {
return new Observable<void>(observer => {
if(typeof message == 'object') {
message.requestId = uuidv4()
this.socket$.next(message);
this.callback[message.requestId] = ({payload, requestId})=> {
observer.next(payload as any);
observer.complete();
}
} else {
this.socket$.next(message);
observer.next({} as any);
observer.complete();
}
}).pipe(
catchError(err => {
console.error('Send message error:', err);
return new Observable<never>(observer => {
observer.error({ type: 'SEND_ERROR', error: err });
});
})
);
}
public get messages$(): Observable<WebSocketMessage> {
return this.messageSubject$.asObservable();
}
public get connectionStatus(): Observable<boolean> {
return this.connectionStatus$.asObservable();
}
}