fix: video service, wait timeout (#13208)

Use multiple frame fetched notifiers.

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou
2025-10-22 01:19:08 -04:00
committed by GitHub
parent a77752c4cb
commit ed39cc3038
2 changed files with 88 additions and 16 deletions

View File

@@ -776,7 +776,9 @@ impl Connection {
}
Some((instant, value)) = rx_video.recv() => {
if !conn.video_ack_required {
video_service::notify_video_frame_fetched(id, Some(instant.into()));
if let Some(message::Union::VideoFrame(vf)) = &value.union {
video_service::notify_video_frame_fetched(vf.display as usize, id, Some(instant.into()));
}
}
if let Err(err) = conn.stream.send(&value as &Message).await {
conn.on_close(&err.to_string(), false).await;
@@ -924,7 +926,7 @@ impl Connection {
crate::plugin::EVENT_ON_CONN_CLOSE_SERVER.to_owned(),
conn.lr.my_id.clone(),
);
video_service::notify_video_frame_fetched(id, None);
video_service::notify_video_frame_fetched_by_conn_id(id, None);
if conn.authorized {
password::update_temporary_password();
}
@@ -2909,7 +2911,7 @@ impl Connection {
self.update_auto_disconnect_timer();
}
Some(misc::Union::VideoReceived(_)) => {
video_service::notify_video_frame_fetched(
video_service::notify_video_frame_fetched_by_conn_id(
self.inner.id,
Some(Instant::now().into()),
);

View File

@@ -62,11 +62,18 @@ use std::{
pub const OPTION_REFRESH: &'static str = "refresh";
type FrameFetchedNotifierSender = UnboundedSender<(i32, Option<Instant>)>;
type FrameFetchedNotifierReceiver = Arc<TokioMutex<UnboundedReceiver<(i32, Option<Instant>)>>>;
lazy_static::lazy_static! {
static ref FRAME_FETCHED_NOTIFIER: (UnboundedSender<(i32, Option<Instant>)>, Arc<TokioMutex<UnboundedReceiver<(i32, Option<Instant>)>>>) = {
let (tx, rx) = unbounded_channel();
(tx, Arc::new(TokioMutex::new(rx)))
};
static ref FRAME_FETCHED_NOTIFIERS: Mutex<HashMap<usize, (FrameFetchedNotifierSender, FrameFetchedNotifierReceiver)>> = Mutex::new(HashMap::default());
// display_idx -> set of conn id.
// Used to record which connections need to be notified when
// 1. A new frame is received from a web client.
// Because web client does not send the display index in message `VideoReceived`.
// 2. The client is closing.
static ref DISPLAY_CONN_IDS: Arc<Mutex<HashMap<usize, HashSet<i32>>>> = Default::default();
pub static ref VIDEO_QOS: Arc<Mutex<VideoQoS>> = Default::default();
pub static ref IS_UAC_RUNNING: Arc<Mutex<bool>> = Default::default();
pub static ref IS_FOREGROUND_WINDOW_ELEVATED: Arc<Mutex<bool>> = Default::default();
@@ -80,18 +87,45 @@ struct Screenshot {
}
#[inline]
pub fn notify_video_frame_fetched(conn_id: i32, frame_tm: Option<Instant>) {
FRAME_FETCHED_NOTIFIER.0.send((conn_id, frame_tm)).ok();
pub fn notify_video_frame_fetched(display_idx: usize, conn_id: i32, frame_tm: Option<Instant>) {
if let Some(notifier) = FRAME_FETCHED_NOTIFIERS.lock().unwrap().get(&display_idx) {
notifier.0.send((conn_id, frame_tm)).ok();
}
}
#[inline]
pub fn notify_video_frame_fetched_by_conn_id(conn_id: i32, frame_tm: Option<Instant>) {
let vec_display_idx: Vec<usize> = {
let display_conn_ids = DISPLAY_CONN_IDS.lock().unwrap();
display_conn_ids
.iter()
.filter_map(|(display_idx, conn_ids)| {
if conn_ids.contains(&conn_id) {
Some(*display_idx)
} else {
None
}
})
.collect()
};
let notifiers = FRAME_FETCHED_NOTIFIERS.lock().unwrap();
for display_idx in vec_display_idx {
if let Some(notifier) = notifiers.get(&display_idx) {
notifier.0.send((conn_id, frame_tm)).ok();
}
}
}
struct VideoFrameController {
display_idx: usize,
cur: Instant,
send_conn_ids: HashSet<i32>,
}
impl VideoFrameController {
fn new() -> Self {
fn new(display_idx: usize) -> Self {
Self {
display_idx,
cur: Instant::now(),
send_conn_ids: HashSet::new(),
}
@@ -105,6 +139,10 @@ impl VideoFrameController {
if !conn_ids.is_empty() {
self.cur = tm;
self.send_conn_ids = conn_ids;
DISPLAY_CONN_IDS
.lock()
.unwrap()
.insert(self.display_idx, self.send_conn_ids.clone());
}
}
@@ -115,8 +153,20 @@ impl VideoFrameController {
}
let timeout_dur = Duration::from_millis(timeout_millis as u64);
match tokio::time::timeout(timeout_dur, FRAME_FETCHED_NOTIFIER.1.lock().await.recv()).await
{
let receiver = {
match FRAME_FETCHED_NOTIFIERS
.lock()
.unwrap()
.get(&self.display_idx)
{
Some(notifier) => notifier.1.clone(),
None => {
return;
}
}
};
let mut receiver_guard = receiver.lock().await;
match tokio::time::timeout(timeout_dur, receiver_guard.recv()).await {
Err(_) => {
// break if timeout
// log::error!("blocking wait frame receiving timeout {}", timeout_millis);
@@ -131,6 +181,14 @@ impl VideoFrameController {
// this branch would never be reached
}
}
while !receiver_guard.is_empty() {
if let Some((id, instant)) = receiver_guard.recv().await {
if let Some(tm) = instant {
log::trace!("Channel recv latency: {}", tm.elapsed().as_secs_f32());
}
fetched_conn_ids.insert(id);
}
}
}
}
@@ -183,6 +241,14 @@ pub fn get_service_name(source: VideoSource, idx: usize) -> String {
}
pub fn new(source: VideoSource, idx: usize) -> GenericService {
let _ = FRAME_FETCHED_NOTIFIERS
.lock()
.unwrap()
.entry(idx)
.or_insert_with(|| {
let (tx, rx) = unbounded_channel();
(tx, Arc::new(TokioMutex::new(rx)))
});
let vs = VideoService {
sp: GenericService::new(get_service_name(source, idx), true),
idx,
@@ -464,7 +530,7 @@ fn get_capturer(
}
fn run(vs: VideoService) -> ResultType<()> {
let mut _raii = Raii::new(vs.sp.name());
let mut _raii = Raii::new(vs.idx, vs.sp.name());
// Wayland only support one video capturer for now. It is ok to call ensure_inited() here.
//
// ensure_inited() is needed because clear() may be called.
@@ -476,7 +542,7 @@ fn run(vs: VideoService) -> ResultType<()> {
let _wayland_call_on_ret = {
// Increment active display count when starting
let _display_count = super::wayland::increment_active_display_count();
SimpleCallOnReturn {
b: true,
f: Box::new(|| {
@@ -563,7 +629,7 @@ fn run(vs: VideoService) -> ResultType<()> {
sp.set_option_bool(OPTION_REFRESH, false);
}
let mut frame_controller = VideoFrameController::new();
let mut frame_controller = VideoFrameController::new(display_idx);
let start = time::Instant::now();
let mut last_check_displays = time::Instant::now();
@@ -811,6 +877,7 @@ fn run(vs: VideoService) -> ResultType<()> {
break;
}
}
DISPLAY_CONN_IDS.lock().unwrap().remove(&display_idx);
let elapsed = now.elapsed();
// may need to enable frame(timeout)
@@ -824,15 +891,17 @@ fn run(vs: VideoService) -> ResultType<()> {
}
struct Raii {
display_idx: usize,
name: String,
try_vram: bool,
}
impl Raii {
fn new(name: String) -> Self {
fn new(display_idx: usize, name: String) -> Self {
log::info!("new video service: {}", name);
VIDEO_QOS.lock().unwrap().new_display(name.clone());
Raii {
display_idx,
name,
try_vram: true,
}
@@ -849,6 +918,7 @@ impl Drop for Raii {
#[cfg(feature = "vram")]
Encoder::update(scrap::codec::EncodingUpdate::Check);
VIDEO_QOS.lock().unwrap().remove_display(&self.name);
DISPLAY_CONN_IDS.lock().unwrap().remove(&self.display_idx);
}
}