mirror of
https://gitee.com/openharmony/resourceschedule_ffrt
synced 2025-03-01 02:07:16 +00:00
ffrt queue 同步
Signed-off-by: wenxingqi <wenxingqi1@h-partners.com>
This commit is contained in:
parent
a4e86ce6fd
commit
3ea2436627
@ -38,29 +38,6 @@ typedef enum {
|
||||
ffrt_inner_queue_priority_idle,
|
||||
} ffrt_inner_queue_priority_t;
|
||||
|
||||
/**
|
||||
* @brief Submits a task to a queue, for tasks with the same delay, insert the header.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @param f Indicates a pointer to the task executor.
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API void ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr);
|
||||
|
||||
/**
|
||||
* @brief Submits a task to the queue, and obtains a task handle, for tasks with the same delay, insert the header.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @param f Indicates a pointer to the task executor.
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @return Returns a non-null task handle if the task is submitted;
|
||||
returns a null pointer otherwise.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API ffrt_task_handle_t ffrt_queue_submit_head_h(
|
||||
ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr);
|
||||
|
||||
/**
|
||||
* @brief Checks whether a task with the given name can be found in the queue.
|
||||
*
|
||||
|
@ -54,24 +54,6 @@ FFRT_C_API void ffrt_restore_qos_config(void);
|
||||
*/
|
||||
FFRT_C_API int ffrt_set_qos_worker_num(ffrt_worker_num_param* qosData);
|
||||
|
||||
/**
|
||||
* @brief Set the task execution timeout.
|
||||
*
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @param timeout_ms task execution timeout.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API void ffrt_task_attr_set_timeout(ffrt_task_attr_t* attr, uint64_t timeout_ms);
|
||||
|
||||
/**
|
||||
* @brief Get the task execution timeout.
|
||||
*
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @return Returns the task execution timeout.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API uint64_t ffrt_task_attr_get_timeout(const ffrt_task_attr_t* attr);
|
||||
|
||||
/**
|
||||
* @brief Sets whether the task notifies worker, only support for normal task.
|
||||
*
|
||||
|
@ -232,4 +232,37 @@ FFRT_C_API ffrt_queue_t ffrt_get_main_queue();
|
||||
*/
|
||||
FFRT_C_API ffrt_queue_t ffrt_get_current_queue();
|
||||
|
||||
/**
|
||||
* @brief Get queue task count.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @return Returns the queue task count.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API uint64_t ffrt_queue_get_task_cnt(ffrt_queue_t queue);
|
||||
|
||||
/**
|
||||
* @brief Submits a task to a queue, for tasks with the same delay, insert the header.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @param f Indicates a pointer to the task executor.
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API void ffrt_queue_submit_head(ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr);
|
||||
|
||||
/**
|
||||
* @brief Submits a task to the queue, and obtains a task handle, for tasks with the same delay, insert the header.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @param f Indicates a pointer to the task executor.
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @return Returns a non-null task handle if the task is submitted;
|
||||
returns a null pointer otherwise.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API ffrt_task_handle_t ffrt_queue_submit_head_h(
|
||||
ffrt_queue_t queue, ffrt_function_header_t* f, const ffrt_task_attr_t* attr);
|
||||
|
||||
#endif // FFRT_API_C_QUEUE_H
|
@ -160,6 +160,24 @@ FFRT_C_API void ffrt_task_attr_set_stack_size(ffrt_task_attr_t* attr, uint64_t s
|
||||
*/
|
||||
FFRT_C_API uint64_t ffrt_task_attr_get_stack_size(const ffrt_task_attr_t* attr);
|
||||
|
||||
/**
|
||||
* @brief Set the task execution timeout.
|
||||
*
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @param timeout_ms task execution timeout.
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API void ffrt_task_attr_set_timeout(ffrt_task_attr_t* attr, uint64_t timeout_us);
|
||||
|
||||
/**
|
||||
* @brief Get the task execution timeout.
|
||||
*
|
||||
* @param attr Indicates a pointer to the task attribute.
|
||||
* @return Returns the task execution timeout.
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
FFRT_C_API uint64_t ffrt_task_attr_get_timeout(const ffrt_task_attr_t* attr);
|
||||
/**
|
||||
* @brief Updates the QoS of this task.
|
||||
*
|
||||
@ -283,4 +301,15 @@ FFRT_C_API void ffrt_wait(void);
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API ffrt_error_t ffrt_set_worker_stack_size(ffrt_qos_t qos, size_t stack_size);
|
||||
|
||||
/**
|
||||
* @brief get gid from task handle.
|
||||
*
|
||||
* @param handle Indicates a task handle.
|
||||
* @return Return git
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
FFRT_C_API uint64_t ffrt_task_handle_get_id(ffrt_task_handle_t handle);
|
||||
|
||||
#endif
|
||||
|
@ -185,6 +185,63 @@ public:
|
||||
queue_handle, create_function_wrapper(std::move(func), ffrt_function_kind_queue), &attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Submits a task with a specified attribute to this queue.
|
||||
*
|
||||
* @param func Indicates a task executor function closure.
|
||||
* @param attr Indicates a task attribute.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline void submit_head(const std::function<void()>& func, const task_attr& attr = {})
|
||||
{
|
||||
ffrt_queue_submit_head(queue_handle, create_function_wrapper(func, ffrt_function_kind_queue), &attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Submits a task with a specified attribute to this queue.
|
||||
*
|
||||
* @param func Indicates a task executor function closure.
|
||||
* @param attr Indicates a task attribute.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline void submit_head(std::function<void()>&& func, const task_attr& attr = {})
|
||||
{
|
||||
ffrt_queue_submit_head(queue_handle, create_function_wrapper(std::move(func), ffrt_function_kind_queue), &attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Submits a task with a specified attribute to this queue, and obtains a task handle.
|
||||
*
|
||||
* @param func Indicates a task executor function closure.
|
||||
* @param attr Indicates a task attribute.
|
||||
* @return Returns a non-null task handle if the task is submitted;
|
||||
returns a null pointer otherwise.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline task_handle submit_head_h(const std::function<void()>& func, const task_attr& attr = {})
|
||||
{
|
||||
return ffrt_queue_submit_head_h(queue_handle, create_function_wrapper(func, ffrt_function_kind_queue), &attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Submits a task with a specified attribute to this queue, and obtains a task handle.
|
||||
*
|
||||
* @param func Indicates a task executor function closure.
|
||||
* @param attr Indicates a task attribute.
|
||||
* @return Returns a non-null task handle if the task is submitted;
|
||||
returns a null pointer otherwise.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline task_handle submit_head_h(std::function<void()>&& func, const task_attr& attr = {})
|
||||
{
|
||||
return ffrt_queue_submit_head_h(
|
||||
queue_handle, create_function_wrapper(std::move(func), ffrt_function_kind_queue), &attr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Cancels a task.
|
||||
*
|
||||
@ -211,6 +268,19 @@ public:
|
||||
return ffrt_queue_wait(handle);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get queue task count.
|
||||
*
|
||||
* @param queue Indicates a queue handle.
|
||||
* @return Returns the queue task count.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline uint64_t get_task_cnt()
|
||||
{
|
||||
return ffrt_queue_get_task_cnt(queue_handle);
|
||||
}
|
||||
|
||||
private:
|
||||
ffrt_queue_t queue_handle = nullptr;
|
||||
};
|
||||
|
@ -176,6 +176,31 @@ public:
|
||||
{
|
||||
return ffrt_task_attr_get_stack_size(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Sets the task schedule timeout.
|
||||
*
|
||||
* @param timeou_us task scheduler timeout.
|
||||
* @since 12
|
||||
* @version 1.0
|
||||
*/
|
||||
inline task_attr& timeout(uint64_t timeout_us)
|
||||
{
|
||||
ffrt_task_attr_set_timeout(this, timeout_us);
|
||||
return *this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Obtains the task schedule timeout.
|
||||
*
|
||||
* @return Returns task scheduler timeout.
|
||||
* @since 12
|
||||
* @version 1.0
|
||||
*/
|
||||
inline uint64_t timeout() const
|
||||
{
|
||||
return ffrt_task_attr_get_timeout(this);
|
||||
}
|
||||
};
|
||||
|
||||
class task_handle {
|
||||
@ -202,6 +227,18 @@ public:
|
||||
*this = std::move(h);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief get gid from task handle.
|
||||
*
|
||||
* @return Return gid.
|
||||
* @since 10
|
||||
* @version 1.0
|
||||
*/
|
||||
inline uint64_t get_id() const
|
||||
{
|
||||
return ffrt_task_handle_get_id(p);
|
||||
}
|
||||
|
||||
inline task_handle& operator=(task_handle&& h)
|
||||
{
|
||||
if (this != &h) {
|
||||
|
@ -185,13 +185,13 @@ uint64_t ffrt_task_attr_get_delay(const ffrt_task_attr_t *attr)
|
||||
}
|
||||
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
void ffrt_task_attr_set_timeout(ffrt_task_attr_t *attr, uint64_t timeout_ms)
|
||||
void ffrt_task_attr_set_timeout(ffrt_task_attr_t *attr, uint64_t timeout_us)
|
||||
{
|
||||
if (unlikely(!attr)) {
|
||||
FFRT_LOGE("attr should be a valid address");
|
||||
return;
|
||||
}
|
||||
(reinterpret_cast<ffrt::task_attr_private *>(attr))->timeout_ = timeout_ms;
|
||||
(reinterpret_cast<ffrt::task_attr_private *>(attr))->timeout_ = timeout_us;
|
||||
}
|
||||
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
@ -357,6 +357,13 @@ void ffrt_task_handle_destroy(ffrt_task_handle_t handle)
|
||||
ffrt_task_handle_dec_ref(handle);
|
||||
}
|
||||
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
uint64_t ffrt_task_handle_get_id(ffrt_task_handle_t handle)
|
||||
{
|
||||
FFRT_COND_DO_ERR((handle == nullptr), return 0, "input task handle is invalid");
|
||||
return static_cast<ffrt::TaskBase*>(handle)->gid;
|
||||
}
|
||||
|
||||
// wait
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
void ffrt_wait_deps(const ffrt_deps_t *deps)
|
||||
@ -495,8 +502,10 @@ uint64_t ffrt_this_task_get_id()
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (curTask->type == ffrt_normal_task || curTask->type == ffrt_queue_task) {
|
||||
if (curTask->type == ffrt_normal_task) {
|
||||
return curTask->gid;
|
||||
} else if (curTask->type == ffrt_queue_task) {
|
||||
return reinterpret_cast<ffrt::QueueTask*>(curTask)->GetHandler()->GetExecTaskId();
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -32,6 +32,7 @@ public:
|
||||
: qos_(attr.qos()),
|
||||
name_(attr.name()),
|
||||
delay_(attr.delay()),
|
||||
timeout_(attr.timeout()),
|
||||
prio_(attr.priority())
|
||||
{
|
||||
}
|
||||
@ -43,6 +44,7 @@ public:
|
||||
bool notifyWorker_ = true;
|
||||
ffrt_queue_priority_t prio_ = ffrt_queue_priority_low;
|
||||
bool taskLocal_ = false;
|
||||
bool headInsert_ = false;
|
||||
ffrt_function_header_t* timeoutCb_ = nullptr;
|
||||
uint64_t stackSize_ = STACK_SIZE;
|
||||
};
|
||||
|
@ -35,8 +35,10 @@ namespace ffrt {
|
||||
static std::mutex lock;
|
||||
|
||||
|
||||
bool IsValidTimeout(uint64_t gid, uint64_t timeout_ms)
|
||||
bool IsValidTimeout(uint64_t gid, uint64_t timeout_us)
|
||||
{
|
||||
// us convert to ms
|
||||
uint64_t timeout_ms = timeout_us / CONVERT_TIME_UNIT;
|
||||
// 当前有效的并行任务timeout时间范围是10-30s
|
||||
if (timeout_ms >= VALID_TIMEOUT_MIN && timeout_ms <= VALID_TIMEOUT_MAX) {
|
||||
FFRT_LOGI("task gid=%llu with timeout [%llu ms] is valid", gid, timeout_ms);
|
||||
@ -62,26 +64,28 @@ namespace ffrt {
|
||||
bool SendTimeoutWatchdog(uint64_t gid, uint64_t timeout, uint64_t delay)
|
||||
{
|
||||
#ifdef FFRT_OH_WATCHDOG_ENABLE
|
||||
FFRT_LOGI("start to set watchdog for task gid=%llu with timeout [%llu ms] ", gid, timeout);
|
||||
// us convert to ms
|
||||
uint64_t timeout_ms = timeout / CONVERT_TIME_UNIT;
|
||||
FFRT_LOGI("start to set watchdog for task gid=%llu with timeout [%llu ms] ", gid, timeout_ms);
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
|
||||
// set dealyedworker callback
|
||||
we->cb = ([gid, timeout](WaitEntry* we) {
|
||||
we->cb = ([gid, timeout_ms](WaitEntry* we) {
|
||||
std::lock_guard<decltype(lock)> l(lock);
|
||||
if (taskStatusMap.count(gid) > 0) {
|
||||
RunTimeOutCallback(gid, timeout);
|
||||
RunTimeOutCallback(gid, timeout_ms);
|
||||
} else {
|
||||
FFRT_LOGI("task gid=%llu has finished", gid);
|
||||
}
|
||||
SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
|
||||
});
|
||||
// set dealyedworker wakeup time
|
||||
std::chrono::microseconds timeoutTime(timeout * CONVERT_TIME_UNIT);
|
||||
std::chrono::microseconds timeoutTime(timeout);
|
||||
std::chrono::microseconds delayTime(delay);
|
||||
we->tp = (now + timeoutTime + delayTime);
|
||||
if (!DelayedWakeup(we->tp, we, we->cb)) {
|
||||
SimpleAllocator<WaitUntilEntry>::FreeMem(we);
|
||||
FFRT_LOGE("failed to set watchdog for task gid=%llu with timeout [%llu ms] ", gid, timeout);
|
||||
FFRT_LOGE("failed to set watchdog for task gid=%llu with timeout [%llu ms] ", gid, timeout_ms);
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
@ -104,7 +108,7 @@ namespace ffrt {
|
||||
FFRT_LOGE("parallel task gid=%llu send watchdog delaywork failed, the count more than the max count", gid);
|
||||
return;
|
||||
}
|
||||
if (!SendTimeoutWatchdog(gid, timeout, 0)) {
|
||||
if (!SendTimeoutWatchdog(gid, timeout * CONVERT_TIME_UNIT, 0)) {
|
||||
FFRT_LOGE("parallel task gid=%llu send next watchdog delaywork failed", gid);
|
||||
return;
|
||||
};
|
||||
|
@ -18,7 +18,7 @@
|
||||
#include "tm/cpu_task.h"
|
||||
|
||||
namespace ffrt {
|
||||
bool IsValidTimeout(uint64_t gid, uint64_t timeout_ms);
|
||||
bool IsValidTimeout(uint64_t gid, uint64_t timeout_us);
|
||||
void AddTaskToWatchdog(uint64_t gid);
|
||||
void RemoveTaskFromWatchdog(uint64_t gid);
|
||||
bool SendTimeoutWatchdog(uint64_t gid, uint64_t timeout, uint64_t delay);
|
||||
|
@ -202,6 +202,14 @@ void ffrt_queue_wait(ffrt_task_handle_t handle)
|
||||
task->Wait();
|
||||
}
|
||||
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
uint64_t ffrt_queue_get_task_cnt(ffrt_queue_t queue)
|
||||
{
|
||||
FFRT_COND_DO_ERR(unlikely(queue == nullptr), return 0, "input invalid, queue == nullptr");
|
||||
QueueHandler* handler = static_cast<QueueHandler*>(queue);
|
||||
return handler->GetTaskCnt();
|
||||
}
|
||||
|
||||
API_ATTRIBUTE((visibility("default")))
|
||||
int ffrt_queue_cancel(ffrt_task_handle_t handle)
|
||||
{
|
||||
|
@ -28,6 +28,7 @@ namespace {
|
||||
constexpr int PROCESS_NAME_BUFFER_LENGTH = 1024;
|
||||
constexpr uint32_t STRING_SIZE_MAX = 128;
|
||||
constexpr uint32_t TASK_DONE_WAIT_UNIT = 10;
|
||||
constexpr uint64_t SCHED_TIME_ACC_ERROR_US = 5000; // 5ms
|
||||
}
|
||||
|
||||
namespace ffrt {
|
||||
@ -80,6 +81,11 @@ QueueHandler::~QueueHandler()
|
||||
cbTask->DecDeleteRef();
|
||||
}
|
||||
}
|
||||
|
||||
if (we_ != nullptr) {
|
||||
DelayedRemove(we_->tp, we_);
|
||||
SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
|
||||
}
|
||||
FFRT_LOGI("destruct %s leave", name_.c_str());
|
||||
}
|
||||
|
||||
@ -126,6 +132,11 @@ void QueueHandler::Submit(QueueTask* task)
|
||||
task->fromTid = ExecuteCtx::Cur()->tid;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (task->GetSchedTimeout() > 0) {
|
||||
AddSchedDeadline(task);
|
||||
}
|
||||
|
||||
int ret = queue_->Push(task);
|
||||
if (ret == SUCC) {
|
||||
FFRT_LOGD("submit task[%lu] into %s", gid, name_.c_str());
|
||||
@ -186,6 +197,9 @@ int QueueHandler::Cancel(QueueTask* task)
|
||||
"cannot cancel, [queueId=%u] constructed failed", GetQueueId());
|
||||
FFRT_COND_DO_ERR((task == nullptr), return INACTIVE, "input invalid, serial task is nullptr");
|
||||
|
||||
if (task->GetSchedTimeout() > 0) {
|
||||
RemoveSchedDeadline(task);
|
||||
}
|
||||
int ret = queue_->Remove(task);
|
||||
if (ret == SUCC) {
|
||||
FFRT_LOGD("cancel task[%llu] %s succ", task->gid, task->label.c_str());
|
||||
@ -204,12 +218,17 @@ void QueueHandler::Dispatch(QueueTask* inTask)
|
||||
// dfx watchdog
|
||||
SetTimeoutMonitor(task);
|
||||
FFRTFacade::GetQMInstance().UpdateQueueInfo(GetQueueId(), task->gid);
|
||||
execTaskId_.store(task->gid);
|
||||
|
||||
// run user task
|
||||
FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
|
||||
auto f = reinterpret_cast<ffrt_function_header_t*>(task->func_storage);
|
||||
FFRT_SERIAL_QUEUE_TASK_EXECUTE_MARKER(task->gid);
|
||||
FFRTTraceRecord::TaskExecute(&(task->executeTime));
|
||||
if (task->GetSchedTimeout() > 0) {
|
||||
RemoveSchedDeadline(task);
|
||||
}
|
||||
|
||||
uint64_t triggerTime{0};
|
||||
if (queue_->GetQueueType() == ffrt_queue_eventhandler_adapter) {
|
||||
triggerTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
@ -232,6 +251,7 @@ void QueueHandler::Dispatch(QueueTask* inTask)
|
||||
nextTask = task->GetNextTask();
|
||||
if (nextTask == nullptr) {
|
||||
FFRTFacade::GetQMInstance().ResetQueueInfo(GetQueueId());
|
||||
execTaskId_.store(0);
|
||||
if (!queue_->IsOnLoop()) {
|
||||
Deliver();
|
||||
}
|
||||
@ -396,4 +416,90 @@ int QueueHandler::DumpSize(ffrt_inner_queue_priority_t priority)
|
||||
return reinterpret_cast<EventHandlerAdapterQueue*>(queue_.get())->DumpSize(priority);
|
||||
}
|
||||
|
||||
void QueueHandler::SendSchedTimer(TimePoint delay)
|
||||
{
|
||||
we_->tp = delay;
|
||||
we_->cb = ([this](WaitEntry* we_) { CheckSchedDeadline(); });
|
||||
bool result = DelayedWakeup(we_->tp, we_, we_->cb);
|
||||
while (!result) {
|
||||
FFRT_LOGW("failed to set delayedwoker, retry");
|
||||
we_->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(SCHED_TIME_ACC_ERROR_US);
|
||||
result = DelayedWakeup(we_->tp, we_, we_->cb);
|
||||
}
|
||||
}
|
||||
|
||||
void QueueHandler::CheckSchedDeadline()
|
||||
{
|
||||
std::vector<uint64_t> timeoutTaskId;
|
||||
// Collecting Timeout Tasks
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
uint64_t threshold = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
std::chrono::steady_clock::now().time_since_epoch()).count() + SCHED_TIME_ACC_ERROR_US;
|
||||
|
||||
auto it = schedDeadline_.begin();
|
||||
uint64_t nextDeadline = UINT64_MAX;
|
||||
while (it != schedDeadline_.end()) {
|
||||
if (it->second < threshold) {
|
||||
timeoutTaskId.push_back(it->first->gid);
|
||||
it = schedDeadline_.erase(it);
|
||||
} else {
|
||||
nextDeadline = std::min(nextDeadline, it->second);
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
if (schedDeadline_.empty()) {
|
||||
initSchedTimer_ = false;
|
||||
} else {
|
||||
std::chrono::microseconds timeout(nextDeadline);
|
||||
TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
|
||||
std::chrono::steady_clock::time_point() + timeout);
|
||||
SendSchedTimer(tp);
|
||||
}
|
||||
}
|
||||
|
||||
// Reporting Timeout Infomation
|
||||
if (!timeoutTaskId.empty()) {
|
||||
std::stringstream ss;
|
||||
ss << "Queue_Schedule_Timeout, queueId=" << GetQueueId() << ", timeout task gid: ";
|
||||
for (auto& id : timeoutTaskId) {
|
||||
ss << id << " ";
|
||||
}
|
||||
|
||||
FFRT_LOGE("%s", ss.str().c_str());
|
||||
ffrt_task_timeout_cb func = ffrt_task_timeout_get_cb();
|
||||
if (func) {
|
||||
func(GetQueueId(), ss.str().c_str(), ss.str().size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void QueueHandler::AddSchedDeadline(QueueTask* task)
|
||||
{
|
||||
// sched timeout only support serial queues, other queue types will be supported based on service requirements.
|
||||
if (queue_->GetQueueType() != ffrt_queue_serial) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::unique_lock lock(mutex_);
|
||||
schedDeadline_.insert({task, task->GetSchedTimeout() + task->GetUptime()});
|
||||
|
||||
if (!initSchedTimer_) {
|
||||
if (we_ == nullptr) {
|
||||
we_ = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
|
||||
}
|
||||
std::chrono::microseconds timeout(schedDeadline_[task]);
|
||||
TimePoint tp = std::chrono::time_point_cast<std::chrono::steady_clock::duration>(
|
||||
std::chrono::steady_clock::time_point() + timeout);
|
||||
SendSchedTimer(tp);
|
||||
}
|
||||
}
|
||||
|
||||
void QueueHandler::RemoveSchedDeadline(QueueTask* task)
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
schedDeadline_.erase(task);
|
||||
}
|
||||
|
||||
} // namespace ffrt
|
||||
|
@ -18,11 +18,13 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include "c/queue.h"
|
||||
#include "c/queue_ext.h"
|
||||
#include "cpp/task.h"
|
||||
#include "base_queue.h"
|
||||
#include "sched/execute_ctx.h"
|
||||
#include "sched/execute_ctx.h"
|
||||
|
||||
namespace ffrt {
|
||||
class QueueTask;
|
||||
@ -66,12 +68,23 @@ public:
|
||||
return queue_->GetQueueId();
|
||||
}
|
||||
|
||||
inline uint32_t GetExecTaskId() const
|
||||
{
|
||||
return execTaskId_.load();
|
||||
}
|
||||
|
||||
inline bool HasTask(const char* name)
|
||||
{
|
||||
FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueId=%u] constructed failed", GetQueueId());
|
||||
return queue_->HasTask(name);
|
||||
}
|
||||
|
||||
inline uint64_t GetTaskCnt()
|
||||
{
|
||||
FFRT_COND_DO_ERR((queue_ == nullptr), return false, "[queueID=%u] constructed failed", GetQueueId());
|
||||
return queue_->GetMapSize();
|
||||
}
|
||||
|
||||
bool IsIdle();
|
||||
void SetEventHandler(void* eventHandler);
|
||||
void* GetEventHandler();
|
||||
@ -91,17 +104,28 @@ private:
|
||||
void RemoveTimeoutMonitor(QueueTask* task);
|
||||
void RunTimeOutCallback(QueueTask* task);
|
||||
|
||||
void CheckSchedDeadline();
|
||||
void SendSchedTimer(TimePoint delay);
|
||||
void AddSchedDeadline(QueueTask* task);
|
||||
void RemoveSchedDeadline(QueueTask* task);
|
||||
|
||||
// queue info
|
||||
std::string name_;
|
||||
int qos_ = qos_default;
|
||||
std::unique_ptr<BaseQueue> queue_ = nullptr;
|
||||
std::atomic_bool isUsed_ = false;
|
||||
std::atomic_uint64_t execTaskId_ = 0;
|
||||
|
||||
// for timeout watchdog
|
||||
uint64_t timeout_ = 0;
|
||||
std::atomic_int delayedCbCnt_ = {0};
|
||||
ffrt_function_header_t* timeoutCb_ = nullptr;
|
||||
WaitUntilEntry* timeoutWe_ = nullptr;
|
||||
|
||||
std::mutex mutex_;
|
||||
bool initSchedTimer_ = false;
|
||||
WaitUntilEntry* we_ = nullptr;
|
||||
std::unordered_map<QueueTask*, uint64_t> schedDeadline_;
|
||||
};
|
||||
} // namespace ffrt
|
||||
|
||||
|
@ -33,7 +33,14 @@ int SerialQueue::Push(QueueTask* task)
|
||||
return INACTIVE;
|
||||
}
|
||||
|
||||
whenMap_.insert({task->GetUptime(), task});
|
||||
if (task->InsertHead() && !whenMap_.empty()) {
|
||||
FFRT_LOGD("head insert task=%u in [queueId=%u]", task->gid, queueId_);
|
||||
uint64_t headTime = (whenMap_.begin()->first > 0) ? whenMap_.begin()->first - 1 : 0;
|
||||
whenMap_.insert({headTime, task});
|
||||
} else {
|
||||
whenMap_.insert({task->GetUptime(), task});
|
||||
}
|
||||
|
||||
if (task == whenMap_.begin()->second) {
|
||||
cond_.NotifyOne();
|
||||
}
|
||||
|
@ -18,6 +18,9 @@
|
||||
#include "c/task.h"
|
||||
#include "util/slab.h"
|
||||
|
||||
namespace {
|
||||
constexpr uint64_t MIN_SCHED_TIMEOUT = 100000; // 0.1s
|
||||
}
|
||||
namespace ffrt {
|
||||
QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool insertHead)
|
||||
: handler_(handler), insertHead_(insertHead)
|
||||
@ -41,9 +44,19 @@ QueueTask::QueueTask(QueueHandler* handler, const task_attr_private* attr, bool
|
||||
uptime_ += delay_;
|
||||
prio_ = attr->prio_;
|
||||
stack_size = std::max(attr->stackSize_, MIN_STACK_SIZE);
|
||||
if (delay_ && attr->timeout_) {
|
||||
FFRT_LOGW("task [gid=%llu] not support delay and timeout at the same time, timeout ignored", gid);
|
||||
} else if (attr->timeout_) {
|
||||
schedTimeout_ = std::max(attr->timeout_, MIN_SCHED_TIMEOUT); // min 0.1s
|
||||
}
|
||||
}
|
||||
|
||||
FFRT_LOGD("ctor task [gid=%llu], delay=%lluus, type=%lu, prio=%d", gid, delay_, type, prio_);
|
||||
FFRT_LOGD("ctor task [gid=%llu], delay=%lluus, type=%lu, prio=%d, timeout=%luus",
|
||||
gid,
|
||||
delay_,
|
||||
type,
|
||||
prio_,
|
||||
schedTimeout_);
|
||||
}
|
||||
|
||||
QueueTask::~QueueTask()
|
||||
|
@ -100,6 +100,10 @@ public:
|
||||
return insertHead_;
|
||||
}
|
||||
|
||||
inline uint64_t GetSchedTimeout() const
|
||||
{
|
||||
return schedTimeout_;
|
||||
}
|
||||
uint8_t func_storage[ffrt_auto_managed_function_storage_size];
|
||||
|
||||
private:
|
||||
@ -108,6 +112,7 @@ private:
|
||||
QueueHandler* handler_;
|
||||
bool insertHead_ = false;
|
||||
uint64_t delay_ = 0;
|
||||
uint64_t schedTimeout_ = 0;
|
||||
int qos_ = qos_inherit;
|
||||
|
||||
QueueTask* nextTask_ = nullptr;
|
||||
|
Loading…
x
Reference in New Issue
Block a user