send and receive message

This commit is contained in:
Peter Maquiran
2024-07-11 10:28:21 +01:00
parent 386ca67315
commit b0325a5558
50 changed files with 720 additions and 372 deletions
@@ -0,0 +1,16 @@
import { TestBed } from '@angular/core/testing';
import { SignalRService } from './signal-r.service';
describe('SignalRService', () => {
let service: SignalRService;
beforeEach(() => {
TestBed.configureTestingModule({});
service = TestBed.inject(SignalRService);
});
it('should be created', () => {
expect(service).toBeTruthy();
});
});
@@ -0,0 +1,78 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs';
import { Platform } from '@ionic/angular';
import { SignalRConnection } from './signalR';
import { Plugins } from '@capacitor/core';
const { App } = Plugins;
@Injectable({
providedIn: 'root'
})
export class SignalRService {
private connection: SignalRConnection;
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
private connectingSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(null);
constructor(
private platform: Platform,) {
// this.startConnection();
// this.addMessageListener();
try {
if (!this.platform.is('desktop')) {
App.addListener('appStateChange', ({ isActive }) => {
if (isActive) {
// The app is in the foreground.
console.log('App is in the foreground');
this.newConnection()
} else {
// The app is in the background.
console.log('App is in the background');
// You can perform actions specific to the background state here.
}
});
}
} catch(error) {}
this.establishConnection()
}
private async establishConnection() {
const connection = new SignalRConnection({url:'https://gdapi-dev.dyndns.info/stage/chathub'})
const attempConnection = await connection.establishConnection()
if(attempConnection.isOk()) {
this.connection?.closeConnection()
this.connection = connection
this.connection.getSendLater().subscribe(data => {
})
this.connection.getMessages().subscribe((data) => {
this.messageSubject.next(data)
})
}
}
getMessage() {
return this.messageSubject.asObservable()
}
async sendMessage(data: Object) {
return await this.connection.sendMessage(data as any)
}
newConnection() {
this.establishConnection()
}
}
+140
View File
@@ -0,0 +1,140 @@
import * as signalR from '@microsoft/signalr';
import { BehaviorSubject, Observable } from 'rxjs';
import { ok, Result, err } from 'neverthrow';
import { SessionStore } from 'src/app/store/session.service';
import { filter, first } from 'rxjs/operators';
export class SignalRConnection {
private hubConnection: signalR.HubConnection;
private messageSubject: BehaviorSubject<string> = new BehaviorSubject<any>(null);
private connectionStateSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private disconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private reconnectSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);
private sendLaterSubject: BehaviorSubject<Object> = new BehaviorSubject<Object>(false);
private reconnect = true
url: string
constructor({url}) {
this.url = url
}
establishConnection(): Promise<Result<signalR.HubConnection, false>> {
return new Promise((resolve, reject) => {
const hubConnection = new signalR.HubConnectionBuilder()
.withUrl(this.url)
.build();
this.hubConnection = hubConnection
this.join()
hubConnection
.start()
.then(() => {
console.log('Connection started');
this.connectionStateSubject.next(true);
this.hubConnection = hubConnection
this.addMessageListener()
this.join()
resolve(ok(hubConnection))
})
.catch(error => {
console.log('Error while starting connection: ' + error);
this.connectionStateSubject.next(false);
reject(err(false))
});
hubConnection.onclose(() => {
console.log('Connection closed');
this.connectionStateSubject.next(false);
this.disconnectSubject.next(true)
if(this.reconnect) {
this.attempReconnect();
}
});
})
}
async attempReconnect() {
const attempConnection = await this.establishConnection()
if(attempConnection.isOk()) {
this.reconnectSubject.next(true)
}
}
public join() {
if(this.connectionStateSubject.value == true) {
console.log('join=============')
this.hubConnection.invoke("Join", 105, "UserFirefox");
} else {
this.sendLaterSubject.next({method: 'SendMessage', args:["Join", 312, "Daniel"]})
}
}
public async sendMessage(data: Object & { requestId}):Promise<Result<any, any>> {
return new Promise((resolve, reject) => {
if(this.connectionStateSubject.value == true) {
console.log('sendMessage')
this.hubConnection.invoke("SendMessage", data)
this.messageSubject.pipe(
filter((message: any) => data.requestId == message?.requestId),
first()
).subscribe(value => {
resolve(ok(value))
console.log('Received valid value:', value);
});
} else {
this.sendLaterSubject.next({method: 'SendMessage', args: data})
return reject(err(false))
}
})
}
private addMessageListener(): void {
this.hubConnection.on('ReceiveMessage', (message) => {
console.log('ReceiveMessage', message)
this.messageSubject.next(message);
});
}
public getMessages(): Observable<string> {
return this.messageSubject.asObservable()
}
public getConnectionState(): Observable<boolean> {
return this.connectionStateSubject.asObservable();
}
public getDisconnectTrigger(): Observable<boolean> {
return this.disconnectSubject.asObservable();
}
public getSendLater() {
return this.sendLaterSubject.asObservable();
}
public closeConnection(): void {
this.reconnect = false
if (this.hubConnection) {
this.hubConnection
.stop()
.then(() => {
console.log('Connection closed by user');
this.connectionStateSubject.next(false);
})
.catch(err => console.log('Error while closing connection: ' + err));
}
}
}
@@ -0,0 +1,50 @@
<!DOCTYPE html>
<html>
<head>
<title>SignalR Client</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/microsoft-signalr/8.0.0/signalr.min.js"></script>
</head>
<body>
<h1>SignalR Client</h1>
<div id="messages"></div>
<input id="chatbox">
<script>
var msgObj = {
roomId: "882fcb86-4028-4242-bb47-fca0170dcc65",
senderId:105,
message:"message enviada",
messageType:1,
canEdit:true,
oneShot:false,
};
const connection = new signalR.HubConnectionBuilder()
.withAutomaticReconnect()
.withUrl("https://gdapi-dev.dyndns.info/stage/chathub")
.build();
connection.start().then(function () {
connection.invoke("Join", 105, "UserFirefox");
document.getElementById("chatbox").addEventListener("keyup", function (event) {
if (event.key === "Enter") {
msgObj.Message = chatbox.value;
connection.invoke("SendMessage", msgObj);
event.target.value = "";
}
});
}).catch(function (err) {
return console.error(err.toString());
});
connection.on("ReceiveMessage", function (message) {
console.log(message);
const messages = document.getElementById("messages");
messages.innerHTML += `<p>${message.message}</p>`;
});
</script>
</body>
</html>
+129
View File
@@ -0,0 +1,129 @@
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, retryWhen, tap, delay } from 'rxjs/operators';
import { SessionStore } from 'src/app/store/session.service';
import { v4 as uuidv4 } from 'uuid'
export interface WebSocketMessage {
type: string;
payload: any;
requestId?: string;
}
interface WebSocketError {
type: string;
error: any;
}
@Injectable({
providedIn: 'root'
})
export class WebSocketService {
private socket$: WebSocketSubject<WebSocketMessage>;
private messageSubject$: Subject<WebSocketMessage>;
private connectionStatus$: BehaviorSubject<boolean>;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 5;
callback: {[key: string]: Function} = {}
constructor() {
this.messageSubject$ = new Subject<WebSocketMessage>();
this.connectionStatus$ = new BehaviorSubject<boolean>(false);
this.connect('https://5-180-182-151.cloud-xip.com:85/ws/')
this.messages$.subscribe(({payload, requestId, type}) => {
if(this.callback[requestId]) {
this.callback[requestId]({payload, requestId, type})
delete this.callback[requestId]
}
})
}
public connect(url: string) {
this.socket$ = webSocket<WebSocketMessage>(url);
this.socket$.pipe(
tap({
error: () => {
this.connectionStatus$.next(false);
}
}),
retryWhen(errors => errors.pipe(
tap(() => {
this.reconnectAttempts++;
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
throw new Error('Max reconnect attempts reached');
}
}),
delay(1000)
))
).subscribe(
(message) => {
this.messageSubject$.next(message);
if (!this.connectionStatus$.getValue()) {
this.connectionStatus$.next(true);
this.reconnectAttempts = 0;
// Send a message when the connection is established
this.sendMessage(SessionStore.user.UserId as any).subscribe();
}
},
(err) => {
console.error('WebSocket connection error:', err);
},
() => {
console.log('WebSocket connection closed');
this.connectionStatus$.next(false);
}
);
}
public sendMessage(message: WebSocketMessage): Observable<any> {
return new Observable<void>(observer => {
if(typeof message == 'object') {
message.requestId = uuidv4()
this.socket$.next(message);
this.callback[message.requestId] = ({payload, requestId})=> {
observer.next(payload as any);
observer.complete();
}
} else {
this.socket$.next(message);
observer.next({} as any);
observer.complete();
}
}).pipe(
catchError(err => {
console.error('Send message error:', err);
return new Observable<never>(observer => {
observer.error({ type: 'SEND_ERROR', error: err });
});
})
);
}
public get messages$(): Observable<WebSocketMessage> {
return this.messageSubject$.asObservable();
}
public get connectionStatus(): Observable<boolean> {
return this.connectionStatus$.asObservable();
}
}