Spaces:
Sleeping
Sleeping
| /** | |
| * Consumer client for receiving video streams in RobotHub TransportServer | |
| */ | |
| import { VideoClientCore } from './core.js'; | |
| import type { | |
| WebSocketMessage, | |
| FrameUpdateMessage, | |
| VideoConfigUpdateMessage, | |
| StreamStartedMessage, | |
| StreamStoppedMessage, | |
| RecoveryTriggeredMessage, | |
| StatusUpdateMessage, | |
| StreamStatsMessage, | |
| ClientOptions, | |
| WebRTCStats, | |
| FrameUpdateCallback, | |
| VideoConfigUpdateCallback, | |
| StreamStartedCallback, | |
| StreamStoppedCallback, | |
| RecoveryTriggeredCallback, | |
| StatusUpdateCallback, | |
| StreamStatsCallback, | |
| WebRTCOfferMessage, | |
| WebRTCIceMessage, | |
| } from './types.js'; | |
| export class VideoConsumer extends VideoClientCore { | |
| // Event callbacks | |
| private onFrameUpdateCallback: FrameUpdateCallback | null = null; | |
| private onVideoConfigUpdateCallback: VideoConfigUpdateCallback | null = null; | |
| private onStreamStartedCallback: StreamStartedCallback | null = null; | |
| private onStreamStoppedCallback: StreamStoppedCallback | null = null; | |
| private onRecoveryTriggeredCallback: RecoveryTriggeredCallback | null = null; | |
| private onStatusUpdateCallback: StatusUpdateCallback | null = null; | |
| private onStreamStatsCallback: StreamStatsCallback | null = null; | |
| // ICE candidate queuing for proper timing | |
| private iceCandidateQueue: { candidate: RTCIceCandidate; fromProducer: string }[] = []; | |
| private hasRemoteDescription = false; | |
| constructor(baseUrl: string, options: ClientOptions = {}) { | |
| super(baseUrl, options); | |
| } | |
| // ============= CONSUMER CONNECTION ============= | |
| async connect(workspaceId: string, roomId: string, participantId?: string): Promise<boolean> { | |
| const connected = await this.connectToRoom(workspaceId, roomId, 'consumer', participantId); | |
| if (connected) { | |
| // Create peer connection immediately so we're ready for WebRTC offers | |
| console.info('π§ Creating peer connection for consumer...'); | |
| await this.startReceiving(); | |
| } | |
| return connected; | |
| } | |
| // ============= CONSUMER METHODS ============= | |
| async startReceiving(): Promise<void> { | |
| if (!this.connected) { | |
| throw new Error('Must be connected to start receiving'); | |
| } | |
| // Reset WebRTC state | |
| this.hasRemoteDescription = false; | |
| this.iceCandidateQueue = []; | |
| // Create peer connection for receiving | |
| this.createPeerConnection(); | |
| // Set up to receive remote stream | |
| if (this.peerConnection) { | |
| this.peerConnection.ontrack = (event: RTCTrackEvent) => { | |
| console.info('πΊ Received remote track:', event.track.kind); | |
| this.remoteStream = event.streams[0] || null; | |
| this.emit('remoteStream', this.remoteStream); | |
| this.emit('streamReceived', this.remoteStream); | |
| }; | |
| } | |
| } | |
| async stopReceiving(): Promise<void> { | |
| if (this.peerConnection) { | |
| this.peerConnection.close(); | |
| this.peerConnection = null; | |
| } | |
| this.remoteStream = null; | |
| this.emit('streamStopped'); | |
| } | |
| // ============= WEBRTC NEGOTIATION ============= | |
| async handleWebRTCOffer(message: WebRTCOfferMessage): Promise<void> { | |
| try { | |
| console.info(`π₯ Received WebRTC offer from producer ${message.from_producer}`); | |
| if (!this.peerConnection) { | |
| console.warn('No peer connection available to handle offer'); | |
| return; | |
| } | |
| // Reset state for new offer | |
| this.hasRemoteDescription = false; | |
| this.iceCandidateQueue = []; | |
| // Set remote description (the offer) | |
| await this.setRemoteDescription(message.offer); | |
| this.hasRemoteDescription = true; | |
| // Process any queued ICE candidates now that we have remote description | |
| await this.processQueuedIceCandidates(); | |
| // Create answer | |
| const answer = await this.createAnswer(message.offer); | |
| console.info(`π€ Sending WebRTC answer to producer ${message.from_producer}`); | |
| // Send answer back through server to producer | |
| if (this.workspaceId && this.roomId && this.participantId) { | |
| await this.sendWebRTCSignal(this.workspaceId, this.roomId, this.participantId, { | |
| type: 'answer', | |
| sdp: answer.sdp, | |
| target_producer: message.from_producer, | |
| } as Record<string, unknown>); | |
| } | |
| console.info('β WebRTC negotiation completed from consumer side'); | |
| } catch (error) { | |
| console.error('Failed to handle WebRTC offer:', error); | |
| this.handleError(`Failed to handle WebRTC offer: ${error}`); | |
| } | |
| } | |
| private async handleWebRTCIce(message: WebRTCIceMessage): Promise<void> { | |
| if (!this.peerConnection) { | |
| console.warn('No peer connection available to handle ICE'); | |
| return; | |
| } | |
| try { | |
| console.info(`π₯ Received WebRTC ICE from producer ${message.from_producer}`); | |
| const candidate = new RTCIceCandidate(message.candidate); | |
| if (!this.hasRemoteDescription) { | |
| // Queue ICE candidate until we have remote description | |
| console.info(`π Queuing ICE candidate from ${message.from_producer} (no remote description yet)`); | |
| this.iceCandidateQueue.push({ | |
| candidate, | |
| fromProducer: message.from_producer || 'unknown' | |
| }); | |
| return; | |
| } | |
| // Add ICE candidate to peer connection | |
| await this.peerConnection.addIceCandidate(candidate); | |
| console.info(`β WebRTC ICE handled from producer ${message.from_producer}`); | |
| } catch (error) { | |
| console.error(`Failed to handle WebRTC ICE from ${message.from_producer}:`, error); | |
| this.handleError(`Failed to handle WebRTC ICE: ${error}`); | |
| } | |
| } | |
| private async processQueuedIceCandidates(): Promise<void> { | |
| if (this.iceCandidateQueue.length === 0) { | |
| return; | |
| } | |
| console.info(`π Processing ${this.iceCandidateQueue.length} queued ICE candidates`); | |
| for (const { candidate, fromProducer } of this.iceCandidateQueue) { | |
| try { | |
| if (this.peerConnection) { | |
| await this.peerConnection.addIceCandidate(candidate); | |
| console.info(`β Processed queued ICE candidate from ${fromProducer}`); | |
| } | |
| } catch (error) { | |
| console.error(`Failed to process queued ICE candidate from ${fromProducer}:`, error); | |
| } | |
| } | |
| // Clear the queue | |
| this.iceCandidateQueue = []; | |
| } | |
| // Override to add producer targeting for ICE candidates | |
| override createPeerConnection(): RTCPeerConnection { | |
| const config: RTCConfiguration = { | |
| iceServers: this.webrtcConfig.iceServers || [ | |
| { urls: 'stun:stun.l.google.com:19302' } | |
| ] | |
| }; | |
| this.peerConnection = new RTCPeerConnection(config); | |
| // Connection state changes | |
| this.peerConnection.onconnectionstatechange = () => { | |
| const state = this.peerConnection?.connectionState; | |
| console.info(`π WebRTC connection state: ${state}`); | |
| }; | |
| // ICE connection state | |
| this.peerConnection.oniceconnectionstatechange = () => { | |
| const state = this.peerConnection?.iceConnectionState; | |
| console.info(`π§ ICE connection state: ${state}`); | |
| }; | |
| // ICE candidate handling - send to producer | |
| this.peerConnection.onicecandidate = (event: RTCPeerConnectionIceEvent) => { | |
| if (event.candidate && this.workspaceId && this.roomId && this.participantId) { | |
| // Send ICE candidate to producer | |
| this.sendIceCandidateToProducer(event.candidate); | |
| } | |
| }; | |
| // Handle remote stream | |
| this.peerConnection.ontrack = (event: RTCTrackEvent) => { | |
| console.info('πΊ Received remote track:', event.track.kind); | |
| this.remoteStream = event.streams[0] || null; | |
| this.emit('remoteStream', this.remoteStream); | |
| this.emit('streamReceived', this.remoteStream); | |
| }; | |
| return this.peerConnection; | |
| } | |
| private async sendIceCandidateToProducer(candidate: RTCIceCandidate): Promise<void> { | |
| if (!this.workspaceId || !this.roomId || !this.participantId) return; | |
| try { | |
| // Get room info to find the producer | |
| const roomInfo = await this.getRoomInfo(this.workspaceId, this.roomId); | |
| if (roomInfo.participants.producer) { | |
| await this.sendWebRTCSignal(this.workspaceId, this.roomId, this.participantId, { | |
| type: 'ice', | |
| candidate: candidate.toJSON(), | |
| target_producer: roomInfo.participants.producer, | |
| } as Record<string, unknown>); | |
| } | |
| } catch (error) { | |
| console.error('Failed to send ICE candidate to producer:', error); | |
| } | |
| } | |
| private async handleStreamStarted(message: StreamStartedMessage): Promise<void> { | |
| if (this.onStreamStartedCallback) { | |
| this.onStreamStartedCallback(message.config, message.participant_id); | |
| } | |
| this.emit('streamStarted', message.config, message.participant_id); | |
| console.info(`π Stream started by producer ${message.participant_id}, ready to receive video`); | |
| } | |
| // ============= EVENT CALLBACKS ============= | |
| onFrameUpdate(callback: FrameUpdateCallback): void { | |
| this.onFrameUpdateCallback = callback; | |
| } | |
| onVideoConfigUpdate(callback: VideoConfigUpdateCallback): void { | |
| this.onVideoConfigUpdateCallback = callback; | |
| } | |
| onStreamStarted(callback: StreamStartedCallback): void { | |
| this.onStreamStartedCallback = callback; | |
| } | |
| onStreamStopped(callback: StreamStoppedCallback): void { | |
| this.onStreamStoppedCallback = callback; | |
| } | |
| onRecoveryTriggered(callback: RecoveryTriggeredCallback): void { | |
| this.onRecoveryTriggeredCallback = callback; | |
| } | |
| onStatusUpdate(callback: StatusUpdateCallback): void { | |
| this.onStatusUpdateCallback = callback; | |
| } | |
| onStreamStats(callback: StreamStatsCallback): void { | |
| this.onStreamStatsCallback = callback; | |
| } | |
| // ============= MESSAGE HANDLING ============= | |
| protected override handleRoleSpecificMessage(message: WebSocketMessage): void { | |
| switch (message.type) { | |
| case 'frame_update': | |
| this.handleFrameUpdate(message as FrameUpdateMessage); | |
| break; | |
| case 'video_config_update': | |
| this.handleVideoConfigUpdate(message as VideoConfigUpdateMessage); | |
| break; | |
| case 'stream_started': | |
| this.handleStreamStarted(message as StreamStartedMessage); | |
| break; | |
| case 'stream_stopped': | |
| this.handleStreamStopped(message as StreamStoppedMessage); | |
| break; | |
| case 'recovery_triggered': | |
| this.handleRecoveryTriggered(message as RecoveryTriggeredMessage); | |
| break; | |
| case 'status_update': | |
| this.handleStatusUpdate(message as StatusUpdateMessage); | |
| break; | |
| case 'stream_stats': | |
| this.handleStreamStats(message as StreamStatsMessage); | |
| break; | |
| case 'participant_joined': | |
| console.info(`π₯ Participant joined: ${message.participant_id} as ${message.role}`); | |
| break; | |
| case 'participant_left': | |
| console.info(`π€ Participant left: ${message.participant_id} (${message.role})`); | |
| break; | |
| case 'webrtc_offer': | |
| this.handleWebRTCOffer(message as WebRTCOfferMessage); | |
| break; | |
| case 'webrtc_answer': | |
| console.info('π¨ Received WebRTC answer (consumer should not receive this)'); | |
| break; | |
| case 'webrtc_ice': | |
| this.handleWebRTCIce(message as WebRTCIceMessage); | |
| break; | |
| case 'emergency_stop': | |
| console.warn(`π¨ Emergency stop: ${message.reason || 'Unknown reason'}`); | |
| this.handleError(`Emergency stop: ${message.reason || 'Unknown reason'}`); | |
| break; | |
| case 'error': | |
| console.error(`Server error: ${message.message}`); | |
| this.handleError(message.message); | |
| break; | |
| default: | |
| console.warn(`Unknown message type for consumer: ${message.type}`); | |
| } | |
| } | |
| private handleFrameUpdate(message: FrameUpdateMessage): void { | |
| if (this.onFrameUpdateCallback) { | |
| const frameData = { | |
| data: message.data, | |
| metadata: message.metadata | |
| }; | |
| this.onFrameUpdateCallback(frameData); | |
| } | |
| this.emit('frameUpdate', message.data); | |
| } | |
| private handleVideoConfigUpdate(message: VideoConfigUpdateMessage): void { | |
| if (this.onVideoConfigUpdateCallback) { | |
| this.onVideoConfigUpdateCallback(message.config); | |
| } | |
| this.emit('videoConfigUpdate', message.config); | |
| } | |
| private handleStreamStopped(message: StreamStoppedMessage): void { | |
| if (this.onStreamStoppedCallback) { | |
| this.onStreamStoppedCallback(message.participant_id, message.reason); | |
| } | |
| this.emit('streamStopped', message.participant_id, message.reason); | |
| } | |
| private handleRecoveryTriggered(message: RecoveryTriggeredMessage): void { | |
| if (this.onRecoveryTriggeredCallback) { | |
| this.onRecoveryTriggeredCallback(message.policy, message.reason); | |
| } | |
| this.emit('recoveryTriggered', message.policy, message.reason); | |
| } | |
| private handleStatusUpdate(message: StatusUpdateMessage): void { | |
| if (this.onStatusUpdateCallback) { | |
| this.onStatusUpdateCallback(message.status, message.data); | |
| } | |
| this.emit('statusUpdate', message.status, message.data); | |
| } | |
| private handleStreamStats(message: StreamStatsMessage): void { | |
| if (this.onStreamStatsCallback) { | |
| this.onStreamStatsCallback(message.stats); | |
| } | |
| this.emit('streamStats', message.stats); | |
| } | |
| // ============= UTILITY METHODS ============= | |
| /** | |
| * Create a consumer and automatically connect to a room | |
| */ | |
| static async createAndConnect( | |
| workspaceId: string, | |
| roomId: string, | |
| baseUrl: string, | |
| participantId?: string | |
| ): Promise<VideoConsumer> { | |
| const consumer = new VideoConsumer(baseUrl); | |
| const connected = await consumer.connect(workspaceId, roomId, participantId); | |
| if (!connected) { | |
| throw new Error('Failed to connect as video consumer'); | |
| } | |
| return consumer; | |
| } | |
| /** | |
| * Get the video element for displaying the remote stream | |
| */ | |
| attachToVideoElement(videoElement: HTMLVideoElement): void { | |
| if (this.remoteStream) { | |
| videoElement.srcObject = this.remoteStream; | |
| } | |
| // Listen for future stream updates | |
| this.on('remoteStream', (stream: MediaStream) => { | |
| videoElement.srcObject = stream; | |
| }); | |
| } | |
| /** | |
| * Get current video statistics | |
| */ | |
| async getVideoStats(): Promise<WebRTCStats | null> { | |
| const stats = await this.getStats(); | |
| return stats; | |
| } | |
| } |