diff --git a/src/server/connection.rs b/src/server/connection.rs index 175bb1b9a..af4892eb0 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -776,7 +776,9 @@ impl Connection { } Some((instant, value)) = rx_video.recv() => { if !conn.video_ack_required { - video_service::notify_video_frame_fetched(id, Some(instant.into())); + if let Some(message::Union::VideoFrame(vf)) = &value.union { + video_service::notify_video_frame_fetched(vf.display as usize, id, Some(instant.into())); + } } if let Err(err) = conn.stream.send(&value as &Message).await { conn.on_close(&err.to_string(), false).await; @@ -924,7 +926,7 @@ impl Connection { crate::plugin::EVENT_ON_CONN_CLOSE_SERVER.to_owned(), conn.lr.my_id.clone(), ); - video_service::notify_video_frame_fetched(id, None); + video_service::notify_video_frame_fetched_by_conn_id(id, None); if conn.authorized { password::update_temporary_password(); } @@ -2909,7 +2911,7 @@ impl Connection { self.update_auto_disconnect_timer(); } Some(misc::Union::VideoReceived(_)) => { - video_service::notify_video_frame_fetched( + video_service::notify_video_frame_fetched_by_conn_id( self.inner.id, Some(Instant::now().into()), ); diff --git a/src/server/video_service.rs b/src/server/video_service.rs index db4927239..13a781c28 100644 --- a/src/server/video_service.rs +++ b/src/server/video_service.rs @@ -62,11 +62,18 @@ use std::{ pub const OPTION_REFRESH: &'static str = "refresh"; +type FrameFetchedNotifierSender = UnboundedSender<(i32, Option)>; +type FrameFetchedNotifierReceiver = Arc)>>>; + lazy_static::lazy_static! { - static ref FRAME_FETCHED_NOTIFIER: (UnboundedSender<(i32, Option)>, Arc)>>>) = { - let (tx, rx) = unbounded_channel(); - (tx, Arc::new(TokioMutex::new(rx))) - }; + static ref FRAME_FETCHED_NOTIFIERS: Mutex> = Mutex::new(HashMap::default()); + + // display_idx -> set of conn id. + // Used to record which connections need to be notified when + // 1. A new frame is received from a web client. + // Because web client does not send the display index in message `VideoReceived`. + // 2. The client is closing. + static ref DISPLAY_CONN_IDS: Arc>>> = Default::default(); pub static ref VIDEO_QOS: Arc> = Default::default(); pub static ref IS_UAC_RUNNING: Arc> = Default::default(); pub static ref IS_FOREGROUND_WINDOW_ELEVATED: Arc> = Default::default(); @@ -80,18 +87,45 @@ struct Screenshot { } #[inline] -pub fn notify_video_frame_fetched(conn_id: i32, frame_tm: Option) { - FRAME_FETCHED_NOTIFIER.0.send((conn_id, frame_tm)).ok(); +pub fn notify_video_frame_fetched(display_idx: usize, conn_id: i32, frame_tm: Option) { + if let Some(notifier) = FRAME_FETCHED_NOTIFIERS.lock().unwrap().get(&display_idx) { + notifier.0.send((conn_id, frame_tm)).ok(); + } +} + +#[inline] +pub fn notify_video_frame_fetched_by_conn_id(conn_id: i32, frame_tm: Option) { + let vec_display_idx: Vec = { + let display_conn_ids = DISPLAY_CONN_IDS.lock().unwrap(); + display_conn_ids + .iter() + .filter_map(|(display_idx, conn_ids)| { + if conn_ids.contains(&conn_id) { + Some(*display_idx) + } else { + None + } + }) + .collect() + }; + let notifiers = FRAME_FETCHED_NOTIFIERS.lock().unwrap(); + for display_idx in vec_display_idx { + if let Some(notifier) = notifiers.get(&display_idx) { + notifier.0.send((conn_id, frame_tm)).ok(); + } + } } struct VideoFrameController { + display_idx: usize, cur: Instant, send_conn_ids: HashSet, } impl VideoFrameController { - fn new() -> Self { + fn new(display_idx: usize) -> Self { Self { + display_idx, cur: Instant::now(), send_conn_ids: HashSet::new(), } @@ -105,6 +139,10 @@ impl VideoFrameController { if !conn_ids.is_empty() { self.cur = tm; self.send_conn_ids = conn_ids; + DISPLAY_CONN_IDS + .lock() + .unwrap() + .insert(self.display_idx, self.send_conn_ids.clone()); } } @@ -115,8 +153,20 @@ impl VideoFrameController { } let timeout_dur = Duration::from_millis(timeout_millis as u64); - match tokio::time::timeout(timeout_dur, FRAME_FETCHED_NOTIFIER.1.lock().await.recv()).await - { + let receiver = { + match FRAME_FETCHED_NOTIFIERS + .lock() + .unwrap() + .get(&self.display_idx) + { + Some(notifier) => notifier.1.clone(), + None => { + return; + } + } + }; + let mut receiver_guard = receiver.lock().await; + match tokio::time::timeout(timeout_dur, receiver_guard.recv()).await { Err(_) => { // break if timeout // log::error!("blocking wait frame receiving timeout {}", timeout_millis); @@ -131,6 +181,14 @@ impl VideoFrameController { // this branch would never be reached } } + while !receiver_guard.is_empty() { + if let Some((id, instant)) = receiver_guard.recv().await { + if let Some(tm) = instant { + log::trace!("Channel recv latency: {}", tm.elapsed().as_secs_f32()); + } + fetched_conn_ids.insert(id); + } + } } } @@ -183,6 +241,14 @@ pub fn get_service_name(source: VideoSource, idx: usize) -> String { } pub fn new(source: VideoSource, idx: usize) -> GenericService { + let _ = FRAME_FETCHED_NOTIFIERS + .lock() + .unwrap() + .entry(idx) + .or_insert_with(|| { + let (tx, rx) = unbounded_channel(); + (tx, Arc::new(TokioMutex::new(rx))) + }); let vs = VideoService { sp: GenericService::new(get_service_name(source, idx), true), idx, @@ -464,7 +530,7 @@ fn get_capturer( } fn run(vs: VideoService) -> ResultType<()> { - let mut _raii = Raii::new(vs.sp.name()); + let mut _raii = Raii::new(vs.idx, vs.sp.name()); // Wayland only support one video capturer for now. It is ok to call ensure_inited() here. // // ensure_inited() is needed because clear() may be called. @@ -476,7 +542,7 @@ fn run(vs: VideoService) -> ResultType<()> { let _wayland_call_on_ret = { // Increment active display count when starting let _display_count = super::wayland::increment_active_display_count(); - + SimpleCallOnReturn { b: true, f: Box::new(|| { @@ -563,7 +629,7 @@ fn run(vs: VideoService) -> ResultType<()> { sp.set_option_bool(OPTION_REFRESH, false); } - let mut frame_controller = VideoFrameController::new(); + let mut frame_controller = VideoFrameController::new(display_idx); let start = time::Instant::now(); let mut last_check_displays = time::Instant::now(); @@ -811,6 +877,7 @@ fn run(vs: VideoService) -> ResultType<()> { break; } } + DISPLAY_CONN_IDS.lock().unwrap().remove(&display_idx); let elapsed = now.elapsed(); // may need to enable frame(timeout) @@ -824,15 +891,17 @@ fn run(vs: VideoService) -> ResultType<()> { } struct Raii { + display_idx: usize, name: String, try_vram: bool, } impl Raii { - fn new(name: String) -> Self { + fn new(display_idx: usize, name: String) -> Self { log::info!("new video service: {}", name); VIDEO_QOS.lock().unwrap().new_display(name.clone()); Raii { + display_idx, name, try_vram: true, } @@ -849,6 +918,7 @@ impl Drop for Raii { #[cfg(feature = "vram")] Encoder::update(scrap::codec::EncodingUpdate::Check); VIDEO_QOS.lock().unwrap().remove_display(&self.name); + DISPLAY_CONN_IDS.lock().unwrap().remove(&self.display_idx); } }