更改上传得到的 Response Body 通过 fd 传递

Signed-off-by: ZhangJianxin <zhangjianxin23@huawei.com>
Change-Id: I579cc122f2d5659aa0d285fe8d1dec4870366f21
This commit is contained in:
ZhangJianxin 2023-09-24 00:26:49 +08:00
parent e085edf86e
commit 2df369799c
11 changed files with 148 additions and 17 deletions

View File

@ -119,6 +119,8 @@ struct Config {
std::map<std::string, std::string> headers;
std::vector<FormItem> forms;
std::vector<FileSpec> files;
std::vector<int32_t> bodyFds;
std::vector<std::string> bodyFileNames;
std::map<std::string, std::string> extras;
};

View File

@ -69,6 +69,7 @@ private:
const std::shared_ptr<OHOS::AbilityRuntime::Context> &context, Config &config, std::string &filePath);
static ExceptionError CheckFilePath(const std::shared_ptr<OHOS::AbilityRuntime::Context> &context, Config &config);
static ExceptionError CheckUploadBodyFiles(Config &config, const std::string &filePath);
static ExceptionError GetFD(const std::string &path, const Config &config, int32_t &fd);
static void InterceptData(const std::string &str, const std::string &in, std::string &out);
static bool IsStageMode(napi_env env, napi_value value);

View File

@ -16,7 +16,9 @@
#ifndef DOWNLOAD_NOTIFY_STUB_H
#define DOWNLOAD_NOTIFY_STUB_H
#include <fstream>
#include <memory>
#include <unistd.h>
#include "iremote_stub.h"
#include "js_common.h"
@ -35,6 +37,9 @@ public:
private:
void OnCallBack(MessageParcel &data);
static void OnDone(MessageParcel &data);
static void GetBodyFromFileToMap(std::string &fileName, std::map<std::string, std::string> &map);
static void GetResponseBody(
std::string &fname, const std::string &type, const NotifyData &notifyData, Notify &notify);
static void GetDownloadNotify(const std::string &type, const NotifyData &notifyData, Notify &notify);
static void GetUploadNotify(const std::string &type, const NotifyData &notifyData, Notify &notify);
};

View File

@ -155,9 +155,45 @@ ExceptionError JsInitialize::CheckFilePath(const std::shared_ptr<OHOS::AbilityRu
return err;
}
}
if (config.action == Action::UPLOAD) {
std::string filePath = context->GetCacheDir();
err = CheckUploadBodyFiles(config, filePath);
}
return err;
}
ExceptionError JsInitialize::CheckUploadBodyFiles(Config &config, const std::string &filePath)
{
ExceptionError error = { .code = E_OK };
size_t len = config.files.size();
for (size_t i = 0; i < len; i++) {
if (filePath.empty()) {
REQUEST_HILOGE("internal to cache error");
return { .code = E_PARAMETER_CHECK, .errInfo = "IsPathValid error empty path" };
}
time_t timestamp = time(NULL);
std::string fileName = filePath + "/tmp_body_" + std::to_string(i) + "_" + std::to_string(timestamp);
if (!NapiUtils::IsPathValid(fileName)) {
REQUEST_HILOGE("IsPathValid error %{public}s", fileName.c_str());
return { .code = E_PARAMETER_CHECK, .errInfo = "IsPathValid error fail path" };
}
int32_t fd = open(fileName.c_str(), O_TRUNC | O_RDWR);
if (fd < 0) {
fd = open(fileName.c_str(), O_CREAT | O_RDWR, FILE_PERMISSION);
if (fd < 0) {
return { .code = E_FILE_IO, .errInfo = "Failed to open file errno " + std::to_string(errno) };
}
}
config.bodyFds.push_back(fd);
config.bodyFileNames.push_back(fileName);
}
return error;
}
ExceptionError JsInitialize::GetFD(const std::string &path, const Config &config, int32_t &fd)
{
ExceptionError error = { .code = E_OK };

View File

@ -62,6 +62,7 @@ void NotifyStub::OnCallBack(MessageParcel &data)
std::string key = data.ReadString();
notifyData.progress.extras[key] = data.ReadString();
}
notifyData.action = static_cast<Action>(data.ReadUint32());
notifyData.version = static_cast<Version>(data.ReadUint32());
size = data.ReadUint32();
@ -97,6 +98,13 @@ void NotifyStub::RequestCallBack(const std::string &type, const std::string &tid
return;
}
auto task = item->second;
uint32_t index = notifyData.progress.index;
std::string fileName;
if (index < task->config_.bodyFileNames.size()) {
fileName = task->config_.bodyFileNames[index];
GetResponseBody(fileName, type, notifyData, notify);
}
std::string key = type + tid;
auto it = task->listenerMap_.find(key);
if (it == task->listenerMap_.end()) {
@ -108,6 +116,35 @@ void NotifyStub::RequestCallBack(const std::string &type, const std::string &tid
}
}
void NotifyStub::GetResponseBody(std::string &fileName, const std::string &type,
const NotifyData &notifyData, Notify &notify)
{
// Check response body.
if (notifyData.version == Version::API9 && notifyData.action == Action::UPLOAD && type == "headerReceive") {
GetBodyFromFileToMap(fileName, notify.header);
} else if (notify.progress.state == State::COMPLETED && notifyData.version == Version::API10
&& notifyData.action == Action::UPLOAD && (type == "progress" || type == "complete")) {
GetBodyFromFileToMap(fileName, notify.progress.extras);
}
return;
}
void NotifyStub::GetBodyFromFileToMap(std::string &fileName, std::map<std::string, std::string> &map)
{
std::ifstream ifs(fileName.c_str());
if (ifs.is_open()) {
std::ostringstream strStream;
strStream << ifs.rdbuf();
std::string buf = strStream.str();
REQUEST_HILOGD("Response body to file end: %{public}zu", buf.size());
std::string key = "body";
map[key] = buf;
ifs.close();
// Delete file.
std::remove(fileName.c_str());
}
}
void NotifyStub::GetDownloadNotify(const std::string &type, const NotifyData &notifyData, Notify &notify)
{
REQUEST_HILOGD("Get download notify data");

View File

@ -97,6 +97,16 @@ void RequestServiceProxy::GetVectorData(const Config &config, MessageParcel &dat
close(file.fd);
}
}
// Response Bodys fds.
data.WriteUint32(config.bodyFds.size());
for (const auto &fd : config.bodyFds) {
data.WriteFileDescriptor(fd);
if (fd > 0) {
close(fd);
}
}
data.WriteUint32(config.headers.size());
for (const auto &header : config.headers) {
data.WriteString(header.first);

View File

@ -87,7 +87,7 @@ impl Clone for Progress {
#[repr(C)]
pub struct RequestTaskMsg {
pub taskId: u32,
pub task_id: u32,
pub uid: i32,
pub action: u8,
}

View File

@ -117,6 +117,14 @@ impl RequestServiceInterface for RequestService {
});
}
// Response Bodys fd.
let body_file_size: u32 = data.read()?;
let mut body_files = Vec::new();
for i in 0..body_file_size {
let fd = data.read::<FileDesc>()?;
body_files.push(File::from(fd));
}
let header_size: u32 = data.read()?;
if header_size > data.get_readable_bytes() {
error!(LOG_LABEL, "size is too large");
@ -176,7 +184,7 @@ impl RequestServiceInterface for RequestService {
debug!(LOG_LABEL, "files {:?}", @public(files));
let mut task_id: u32 = 0;
let ret =
RequestAbility::get_ability_instance().construct(task_config, files, &mut task_id);
RequestAbility::get_ability_instance().construct(task_config, files, body_files, &mut task_id);
let remote_object: RemoteObj = data.read::<RemoteObj>()?;
RequestAbility::get_ability_instance().on(task_id, "done".to_string(), remote_object);
reply.write(&(ret as i32))?;

View File

@ -100,7 +100,7 @@ impl RequestAbility {
self.server_state = ServerRunState::NoStart;
}
pub fn construct(&self, config: TaskConfig, files: Vec<File>, task_id: &mut u32) -> ErrorCode {
pub fn construct(&self, config: TaskConfig, files: Vec<File>, body_files: Vec<File>, task_id: &mut u32) -> ErrorCode {
debug!(LOG_LABEL, "construct");
let uid = get_calling_uid();
let version = config.version.clone();
@ -109,6 +109,7 @@ impl RequestAbility {
get_calling_uid(),
task_id,
files,
body_files,
);
if version != Version::API10 {
TaskManager::get_instance().start(get_calling_uid(), *task_id);
@ -286,6 +287,7 @@ impl RequestAbility {
}
}
// Send to "OHOS.Download.NotifyInterface" CallBack().
pub fn notify_client(cb_type: String, notify_data: &NotifyData) {
debug!(LOG_LABEL, "notify_client");
if notify_data.progress.common_data.index >= notify_data.progress.sizes.len() {
@ -356,6 +358,7 @@ impl RequestAbility {
}
}
// Send to "OHOS.Download.NotifyInterface" Down().
pub fn notify_task_info(task_info: &TaskInfo) {
debug!(LOG_LABEL, "notify_task_info");
if task_info.progress.common_data.index >= task_info.progress.sizes.len() {

View File

@ -34,7 +34,7 @@ use ylong_http_client::{
Response, SpeedLimit, Timeout, TlsVersion,
};
use ylong_runtime::fs::File as YlongFile;
use ylong_runtime::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf,AsyncSeekExt};
use ylong_runtime::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf, AsyncWriteExt, AsyncSeekExt};
static CONNECT_TIMEOUT: u64 = 60;
static LOW_SPEED_TIME: u64 = 60;
@ -66,6 +66,11 @@ struct Files(UnsafeCell<Vec<YlongFile>>);
unsafe impl Sync for Files {}
unsafe impl Send for Files {}
// Need to release file timely.
struct BodyFiles(UnsafeCell<Vec<Option<YlongFile>>>);
unsafe impl Sync for BodyFiles {}
unsafe impl Send for BodyFiles {}
pub struct RequestTask {
pub conf: Arc<TaskConfig>,
pub uid: u64,
@ -84,6 +89,7 @@ pub struct RequestTask {
pub file_total_size: AtomicI64,
pub resume: AtomicBool,
files: Files,
body_files: BodyFiles,
seek_flag: AtomicBool,
range_request: AtomicBool,
range_response: AtomicBool,
@ -281,7 +287,7 @@ impl AsyncRead for TaskReader {
}
impl RequestTask {
pub fn constructor(conf: Arc<TaskConfig>, uid: u64, task_id: u32, files: Vec<File>) -> Self {
pub fn constructor(conf: Arc<TaskConfig>, uid: u64, task_id: u32, files: Vec<File>, body_files: Vec<File>) -> Self {
let mut sizes: Vec<i64> = Vec::<i64>::new();
match conf.common_data.action {
Action::DOWNLOAD => sizes.push(-1),
@ -304,6 +310,9 @@ impl RequestTask {
files: Files(UnsafeCell::new(
files.into_iter().map(|f| YlongFile::new(f)).collect(),
)),
body_files: BodyFiles(UnsafeCell::new(
body_files.into_iter().map(|f| Some(YlongFile::new(f))).collect(),
)),
mime_type: Mutex::new(String::new()),
progress: Mutex::new(Progress::new(sizes)),
tries: AtomicU32::new(0),
@ -701,15 +710,34 @@ impl RequestTask {
}
}
async fn record_upload_response(&self, response: Result<Response, HttpClientError>) {
async fn record_upload_response(
&self,
index: usize,
response: Result<Response, HttpClientError>,
) {
self.record_response_header(&response);
if let Ok(r) = response {
if let Ok(body) = r.text().await {
self.progress
.lock()
.unwrap()
.extras
.insert("body".into(), body);
if let Ok(mut r) = response {
let mut yfile = match unsafe { &mut *self.body_files.0.get() }.get_mut(index) {
Some(yfile) => match yfile.take() {
Some(yf) => yf,
None => return,
},
None => return,
};
loop {
let mut buf = [0u8; 1024];
let size = r.body_mut().data(&mut buf).await;
let size = match size {
Ok(size) => size,
Err(_e) => break,
};
if size == 0 {
break;
}
let r = yfile.write_all(&buf[..size]).await;
error!(LOG_LABEL, "Res writeall {:?}", @public(r));
}
}
if self.conf.version == Version::API9 && self.conf.common_data.action == Action::UPLOAD {
@ -983,7 +1011,7 @@ impl RequestTask {
let percent = total_processed * 100 / (file_total_size as u64);
info!(LOG_LABEL, "background notify");
let task_msg = RequestTaskMsg {
taskId: self.task_id,
task_id: self.task_id,
uid: self.uid as i32,
action: self.conf.common_data.action as u8,
};
@ -1237,10 +1265,10 @@ where
let response = task.client.as_ref().unwrap().request(request.unwrap()).await;
if task.handle_response_error(&response).await {
task.code.lock().unwrap()[index] = Reason::Default;
task.record_upload_response(response).await;
task.record_upload_response(index, response).await;
return true;
}
task.record_upload_response(response).await;
task.record_upload_response(index, response).await;
let code = task.code.lock().unwrap()[index];
if code != Reason::Default {
error!(LOG_LABEL, "upload {} file fail, which reason is {}", @public(index), @public(code as u32));

View File

@ -237,6 +237,7 @@ impl TaskManager {
uid: u64,
task_id: &mut u32,
files: Vec<File>,
body_files: Vec<File>,
) -> ErrorCode {
debug!(LOG_LABEL, "begin construct a task");
if files.len() == 0 {
@ -244,7 +245,7 @@ impl TaskManager {
}
*task_id = generate_task_id();
let bundle = conf.bundle.clone();
let task = RequestTask::constructor(conf, uid, *task_id, files);
let task = RequestTask::constructor(conf, uid, *task_id, files, body_files);
let mut task_map_guard = self.task_map.lock().unwrap();
if self.unloading.load(Ordering::SeqCst) {
return ErrorCode::UnloadingSA;