From 43ec57c7691152aad708f49f0a838da87db31e39 Mon Sep 17 00:00:00 2001 From: fufesou <13586388+fufesou@users.noreply.github.com> Date: Sat, 9 Aug 2025 23:47:19 +0800 Subject: [PATCH] Feat: file transfer, resume (#12557) Signed-off-by: fufesou --- src/client/io_loop.rs | 63 ++++++++++++++++++++++++++++------------ src/ipc.rs | 5 +++- src/server/connection.rs | 6 +++- src/ui_cm_interface.rs | 8 +++++ 4 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 4de0e7e32..9ed96365d 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -703,6 +703,7 @@ impl Remote { if is_remote { if let Some(job) = get_job(id, &mut self.write_jobs) { job.is_last_job = false; + job.is_resume = true; allow_err!( peer.send(&fs::new_send( id, @@ -717,12 +718,13 @@ impl Remote { } else { if let Some(job) = get_job(id, &mut self.read_jobs) { match &job.data_source { - fs::DataSource::FilePath(p) => { + fs::DataSource::FilePath(_p) => { job.is_last_job = false; + job.is_resume = true; allow_err!( peer.send(&fs::new_receive( id, - p.to_string_lossy().to_string(), + job.remote.clone(), job.file_num, job.files.clone(), job.total_size(), @@ -770,7 +772,8 @@ impl Remote { Some(file_transfer_send_confirm_request::Union::Skip(true)) }, ..Default::default() - }); + }) + .await; } } else { if let Some(job) = fs::get_job(id, &mut self.write_jobs) { @@ -789,7 +792,7 @@ impl Remote { }, ..Default::default() }; - job.confirm(&req); + job.confirm(&req).await; file_action.set_send_confirm(req); msg.set_file_action(file_action); allow_err!(peer.send(&msg).await); @@ -1470,14 +1473,24 @@ impl Remote { if let fs::DataSource::FilePath(p) = &job.data_source { let read_path = get_string(&fs::TransferJob::join(p, &file.name)); - let overwrite_strategy = + let mut overwrite_strategy = job.default_overwrite_strategy(); + let mut offset = 0; + if digest.is_identical && job.is_resume { + if digest.transferred_size > 0 { + overwrite_strategy = Some(true); + offset = digest.transferred_size as _; + } else { + // Force skip if the file is identical and the job is set to resume. + overwrite_strategy = Some(false); + } + } if let Some(overwrite) = overwrite_strategy { let req = FileTransferSendConfirmRequest { id: digest.id, file_num: digest.file_num, union: Some(if overwrite { - file_transfer_send_confirm_request::Union::OffsetBlk(0) + file_transfer_send_confirm_request::Union::OffsetBlk(offset) } else { file_transfer_send_confirm_request::Union::Skip( true, @@ -1485,7 +1498,7 @@ impl Remote { }), ..Default::default() }; - job.confirm(&req); + job.confirm(&req).await; let msg = new_send_confirm(req); allow_err!(peer.send(&msg).await); } else { @@ -1506,8 +1519,7 @@ impl Remote { if let fs::DataSource::FilePath(p) = &job.data_source { let write_path = get_string(&fs::TransferJob::join(p, &file.name)); - let overwrite_strategy = - job.default_overwrite_strategy(); + job.set_digest(digest.file_size, digest.last_modified); match fs::is_write_need_confirmation( &write_path, &digest, @@ -1515,16 +1527,29 @@ impl Remote { Ok(res) => match res { DigestCheckResult::IsSame => { let req = FileTransferSendConfirmRequest { - id: digest.id, - file_num: digest.file_num, - union: Some(file_transfer_send_confirm_request::Union::Skip(true)), - ..Default::default() - }; - job.confirm(&req); + id: digest.id, + file_num: digest.file_num, + union: Some(file_transfer_send_confirm_request::Union::Skip(true)), + ..Default::default() + }; + job.confirm(&req).await; let msg = new_send_confirm(req); allow_err!(peer.send(&msg).await); } DigestCheckResult::NeedConfirm(digest) => { + let mut overwrite_strategy = + job.default_overwrite_strategy(); + let mut offset = 0; + if digest.is_identical && job.is_resume { + if digest.transferred_size > 0 { + overwrite_strategy = Some(true); + offset = + digest.transferred_size as _; + } else { + // Force skip if the file is identical and the job is set to resume. + overwrite_strategy = Some(false); + } + } if let Some(overwrite) = overwrite_strategy { let req = @@ -1532,13 +1557,13 @@ impl Remote { id: digest.id, file_num: digest.file_num, union: Some(if overwrite { - file_transfer_send_confirm_request::Union::OffsetBlk(0) + file_transfer_send_confirm_request::Union::OffsetBlk(offset) } else { file_transfer_send_confirm_request::Union::Skip(true) }), ..Default::default() }; - job.confirm(&req); + job.confirm(&req).await; let msg = new_send_confirm(req); allow_err!(peer.send(&msg).await); } else { @@ -1558,7 +1583,7 @@ impl Remote { union: Some(file_transfer_send_confirm_request::Union::OffsetBlk(0)), ..Default::default() }; - job.confirm(&req); + job.confirm(&req).await; let msg = new_send_confirm(req); allow_err!(peer.send(&msg).await); } @@ -1905,7 +1930,7 @@ impl Remote { }, Some(file_action::Union::SendConfirm(c)) => { if let Some(job) = fs::get_job(c.id, &mut self.read_jobs) { - job.confirm(&c); + job.confirm(&c).await; } } _ => {} diff --git a/src/ipc.rs b/src/ipc.rs index 1ae048162..8967b9213 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -26,7 +26,9 @@ use hbb_common::{ config::{self, keys::OPTION_ALLOW_WEBSOCKET, Config, Config2}, futures::StreamExt as _, futures_util::sink::SinkExt, - log, password_security as password, timeout, + log, + message_proto::FileTransferSendConfirmRequest, + password_security as password, timeout, tokio::{ self, io::{AsyncRead, AsyncWrite}, @@ -105,6 +107,7 @@ pub enum FS { last_modified: u64, is_upload: bool, }, + SendConfirm(Vec), Rename { id: i32, path: String, diff --git a/src/server/connection.rs b/src/server/connection.rs index 01d84437d..8ce09f932 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -2703,7 +2703,11 @@ impl Connection { } Some(file_action::Union::SendConfirm(r)) => { if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) { - job.confirm(&r); + job.confirm(&r).await; + } else { + if let Ok(sc) = r.write_to_bytes() { + self.send_fs(ipc::FS::SendConfirm(sc)); + } } } Some(file_action::Union::Rename(r)) => { diff --git a/src/ui_cm_interface.rs b/src/ui_cm_interface.rs index 880f0ca61..0f5bc5709 100644 --- a/src/ui_cm_interface.rs +++ b/src/ui_cm_interface.rs @@ -880,6 +880,7 @@ async fn handle_fs( let path = get_string(&fs::TransferJob::join(p, &file.name)); match is_write_need_confirmation(&path, &digest) { Ok(digest_result) => { + job.set_digest(file_size, last_modified); match digest_result { DigestCheckResult::IsSame => { req.set_skip(true); @@ -909,6 +910,13 @@ async fn handle_fs( } } } + ipc::FS::SendConfirm(bytes) => { + if let Ok(r) = FileTransferSendConfirmRequest::parse_from_bytes(&bytes) { + if let Some(job) = fs::get_job(r.id, write_jobs) { + job.confirm(&r).await; + } + } + } ipc::FS::Rename { id, path, new_name } => { rename_file(path, new_name, id, tx).await; }