Feat: file transfer, resume (#12557)
Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
@@ -703,6 +703,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
if is_remote {
|
||||
if let Some(job) = get_job(id, &mut self.write_jobs) {
|
||||
job.is_last_job = false;
|
||||
job.is_resume = true;
|
||||
allow_err!(
|
||||
peer.send(&fs::new_send(
|
||||
id,
|
||||
@@ -717,12 +718,13 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
} else {
|
||||
if let Some(job) = get_job(id, &mut self.read_jobs) {
|
||||
match &job.data_source {
|
||||
fs::DataSource::FilePath(p) => {
|
||||
fs::DataSource::FilePath(_p) => {
|
||||
job.is_last_job = false;
|
||||
job.is_resume = true;
|
||||
allow_err!(
|
||||
peer.send(&fs::new_receive(
|
||||
id,
|
||||
p.to_string_lossy().to_string(),
|
||||
job.remote.clone(),
|
||||
job.file_num,
|
||||
job.files.clone(),
|
||||
job.total_size(),
|
||||
@@ -770,7 +772,8 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
Some(file_transfer_send_confirm_request::Union::Skip(true))
|
||||
},
|
||||
..Default::default()
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
if let Some(job) = fs::get_job(id, &mut self.write_jobs) {
|
||||
@@ -789,7 +792,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req);
|
||||
job.confirm(&req).await;
|
||||
file_action.set_send_confirm(req);
|
||||
msg.set_file_action(file_action);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
@@ -1470,14 +1473,24 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
if let fs::DataSource::FilePath(p) = &job.data_source {
|
||||
let read_path =
|
||||
get_string(&fs::TransferJob::join(p, &file.name));
|
||||
let overwrite_strategy =
|
||||
let mut overwrite_strategy =
|
||||
job.default_overwrite_strategy();
|
||||
let mut offset = 0;
|
||||
if digest.is_identical && job.is_resume {
|
||||
if digest.transferred_size > 0 {
|
||||
overwrite_strategy = Some(true);
|
||||
offset = digest.transferred_size as _;
|
||||
} else {
|
||||
// Force skip if the file is identical and the job is set to resume.
|
||||
overwrite_strategy = Some(false);
|
||||
}
|
||||
}
|
||||
if let Some(overwrite) = overwrite_strategy {
|
||||
let req = FileTransferSendConfirmRequest {
|
||||
id: digest.id,
|
||||
file_num: digest.file_num,
|
||||
union: Some(if overwrite {
|
||||
file_transfer_send_confirm_request::Union::OffsetBlk(0)
|
||||
file_transfer_send_confirm_request::Union::OffsetBlk(offset)
|
||||
} else {
|
||||
file_transfer_send_confirm_request::Union::Skip(
|
||||
true,
|
||||
@@ -1485,7 +1498,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req);
|
||||
job.confirm(&req).await;
|
||||
let msg = new_send_confirm(req);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
} else {
|
||||
@@ -1506,8 +1519,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
if let fs::DataSource::FilePath(p) = &job.data_source {
|
||||
let write_path =
|
||||
get_string(&fs::TransferJob::join(p, &file.name));
|
||||
let overwrite_strategy =
|
||||
job.default_overwrite_strategy();
|
||||
job.set_digest(digest.file_size, digest.last_modified);
|
||||
match fs::is_write_need_confirmation(
|
||||
&write_path,
|
||||
&digest,
|
||||
@@ -1515,16 +1527,29 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
Ok(res) => match res {
|
||||
DigestCheckResult::IsSame => {
|
||||
let req = FileTransferSendConfirmRequest {
|
||||
id: digest.id,
|
||||
file_num: digest.file_num,
|
||||
union: Some(file_transfer_send_confirm_request::Union::Skip(true)),
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req);
|
||||
id: digest.id,
|
||||
file_num: digest.file_num,
|
||||
union: Some(file_transfer_send_confirm_request::Union::Skip(true)),
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req).await;
|
||||
let msg = new_send_confirm(req);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
}
|
||||
DigestCheckResult::NeedConfirm(digest) => {
|
||||
let mut overwrite_strategy =
|
||||
job.default_overwrite_strategy();
|
||||
let mut offset = 0;
|
||||
if digest.is_identical && job.is_resume {
|
||||
if digest.transferred_size > 0 {
|
||||
overwrite_strategy = Some(true);
|
||||
offset =
|
||||
digest.transferred_size as _;
|
||||
} else {
|
||||
// Force skip if the file is identical and the job is set to resume.
|
||||
overwrite_strategy = Some(false);
|
||||
}
|
||||
}
|
||||
if let Some(overwrite) = overwrite_strategy
|
||||
{
|
||||
let req =
|
||||
@@ -1532,13 +1557,13 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
id: digest.id,
|
||||
file_num: digest.file_num,
|
||||
union: Some(if overwrite {
|
||||
file_transfer_send_confirm_request::Union::OffsetBlk(0)
|
||||
file_transfer_send_confirm_request::Union::OffsetBlk(offset)
|
||||
} else {
|
||||
file_transfer_send_confirm_request::Union::Skip(true)
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req);
|
||||
job.confirm(&req).await;
|
||||
let msg = new_send_confirm(req);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
} else {
|
||||
@@ -1558,7 +1583,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
union: Some(file_transfer_send_confirm_request::Union::OffsetBlk(0)),
|
||||
..Default::default()
|
||||
};
|
||||
job.confirm(&req);
|
||||
job.confirm(&req).await;
|
||||
let msg = new_send_confirm(req);
|
||||
allow_err!(peer.send(&msg).await);
|
||||
}
|
||||
@@ -1905,7 +1930,7 @@ impl<T: InvokeUiSession> Remote<T> {
|
||||
},
|
||||
Some(file_action::Union::SendConfirm(c)) => {
|
||||
if let Some(job) = fs::get_job(c.id, &mut self.read_jobs) {
|
||||
job.confirm(&c);
|
||||
job.confirm(&c).await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
|
||||
@@ -26,7 +26,9 @@ use hbb_common::{
|
||||
config::{self, keys::OPTION_ALLOW_WEBSOCKET, Config, Config2},
|
||||
futures::StreamExt as _,
|
||||
futures_util::sink::SinkExt,
|
||||
log, password_security as password, timeout,
|
||||
log,
|
||||
message_proto::FileTransferSendConfirmRequest,
|
||||
password_security as password, timeout,
|
||||
tokio::{
|
||||
self,
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
@@ -105,6 +107,7 @@ pub enum FS {
|
||||
last_modified: u64,
|
||||
is_upload: bool,
|
||||
},
|
||||
SendConfirm(Vec<u8>),
|
||||
Rename {
|
||||
id: i32,
|
||||
path: String,
|
||||
|
||||
@@ -2703,7 +2703,11 @@ impl Connection {
|
||||
}
|
||||
Some(file_action::Union::SendConfirm(r)) => {
|
||||
if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) {
|
||||
job.confirm(&r);
|
||||
job.confirm(&r).await;
|
||||
} else {
|
||||
if let Ok(sc) = r.write_to_bytes() {
|
||||
self.send_fs(ipc::FS::SendConfirm(sc));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(file_action::Union::Rename(r)) => {
|
||||
|
||||
@@ -880,6 +880,7 @@ async fn handle_fs(
|
||||
let path = get_string(&fs::TransferJob::join(p, &file.name));
|
||||
match is_write_need_confirmation(&path, &digest) {
|
||||
Ok(digest_result) => {
|
||||
job.set_digest(file_size, last_modified);
|
||||
match digest_result {
|
||||
DigestCheckResult::IsSame => {
|
||||
req.set_skip(true);
|
||||
@@ -909,6 +910,13 @@ async fn handle_fs(
|
||||
}
|
||||
}
|
||||
}
|
||||
ipc::FS::SendConfirm(bytes) => {
|
||||
if let Ok(r) = FileTransferSendConfirmRequest::parse_from_bytes(&bytes) {
|
||||
if let Some(job) = fs::get_job(r.id, write_jobs) {
|
||||
job.confirm(&r).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
ipc::FS::Rename { id, path, new_name } => {
|
||||
rename_file(path, new_name, id, tx).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user