import { Injectable } from '@angular/core'; import { Subject, BehaviorSubject, Observable } from 'rxjs'; import { filter } from 'rxjs/operators'; @Injectable({ providedIn: 'root' }) export class MessageLiveDataSourceService { private messageSubject: Subject = new Subject(); private isPaused: BehaviorSubject = new BehaviorSubject(false); private messageBuffer: any[] = []; // Buffer to store messages when paused private authentication!: string constructor() { } handleMessage(event: any) { const data: any = event.data || MessageEvent; if (this.isPaused.getValue()) { this.messageBuffer.push({}); // Buffer the message if paused } else { this.messageSubject.next({} as any); } } pauseBroadcast() { this.isPaused.next(true); } resumeBroadcast() { this.isPaused.next(false); this.flushMessageBuffer(); // Emit all buffered messages } private flushMessageBuffer() { while (this.messageBuffer.length > 0) { const bufferedMessage = this.messageBuffer.shift(); this.messageSubject.next(bufferedMessage); } } subscribe(endpoint: any): Observable { return this.messageSubject.pipe( filter(message => message && message.endpoint === endpoint && message.authentication == this.authentication ) ) } }