diff --git a/CMakeLists.txt b/CMakeLists.txt index 9802fbb..bfcc898 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ option(FFRT_BENCHMARKS "Enables ffrt Benchmarks" OFF) option(FFRT_TEST_ENABLE "Enables ffrt test" OFF) option(FFRT_CLANG_COMPILE "use clang/clang++ for compiling" ON) option(FFRT_SANITIZE "enable address or thread sanitizer" OFF) -option(FFRT_IO_TASK_SCHEDULER "enalbe io task scheduler" ON) +option(FFRT_IO_TASK_SCHEDULER "enalbe io task scheduler" OFF) # set compiler clang or gcc, must before project(ffrt) if(FFRT_CLANG_COMPILE STREQUAL ON) diff --git a/interfaces/inner_api/c/executor_task.h b/interfaces/inner_api/c/executor_task.h index c41af61..8eb927a 100644 --- a/interfaces/inner_api/c/executor_task.h +++ b/interfaces/inner_api/c/executor_task.h @@ -33,13 +33,15 @@ typedef void (*ffrt_executor_task_func)(ffrt_executor_task_t* data, ffrt_qos_t q FFRT_C_API void ffrt_executor_task_register_func(ffrt_executor_task_func func, ffrt_executor_task_type_t type); FFRT_C_API void ffrt_executor_task_submit(ffrt_executor_task_t *task, const ffrt_task_attr_t *attr); -FFRT_C_API int ffrt_executor_task_cancel(ffrt_executor_task_t *taask, const ffrt_qos_t qos); +FFRT_C_API int ffrt_executor_task_cancel(ffrt_executor_task_t *task, const ffrt_qos_t qos); #ifdef FFRT_IO_TASK_SCHEDULER // poller -FFRT_C_API int ffrt_poller_register(int fd, uint32_t events, void* data, void(*cb)(void*, uint32_t)); +typedef void (*ffrt_poller_cb)(void*, uint32_t); +typedef int (*ffrt_timer_func)(); +FFRT_C_API int ffrt_poller_register(int fd, uint32_t events, void* data, ffrt_poller_cb cb); FFRT_C_API int ffrt_poller_deregister(int fd); -FFRT_C_API int ffrt_poller_register_timerfunc(int(*timerFunc)()); +FFRT_C_API int ffrt_poller_register_timerfunc(ffrt_timer_func timerFunc); FFRT_C_API void ffrt_poller_wakeup(); FFRT_C_API void ffrt_submit_coroutine(void* co, ffrt_coroutine_ptr_t exec, diff --git a/interfaces/inner_api/c/type_def.h b/interfaces/inner_api/c/type_def.h index 5cf40c7..0734d82 100644 --- a/interfaces/inner_api/c/type_def.h +++ b/interfaces/inner_api/c/type_def.h @@ -82,6 +82,26 @@ typedef void* ffrt_sys_event_handle_t; typedef void* ffrt_config_t; +#ifdef FFRT_IO_TASK_SCHEDULER +typedef enum { + ffrt_coroutine_stackless, + ffrt_coroutine_with_stack, +} ffrt_coroutine_t; + +typedef enum { + ffrt_coroutine_pending = 0, + ffrt_coroutine_ready = 0, +} ffrt_coroutine_ret_t; + +typedef ffrt_coroutine_ret_t(*ffrt_coroutine_ptr_t)(void*); + +typedef struct { + int fd; + void* data; + void(*cb)(void*, uint32_t); +} ffrt_poller_t; +#endif + #ifdef __cplusplus namespace ffrt { enum qos_inner_default { diff --git a/interfaces/kits/c/type_def.h b/interfaces/kits/c/type_def.h index 1cedf23..ece8ab0 100644 --- a/interfaces/kits/c/type_def.h +++ b/interfaces/kits/c/type_def.h @@ -103,7 +103,10 @@ typedef enum { /** General task. */ ffrt_function_kind_general, /** Queue task. */ - ffrt_function_kind_queue + ffrt_function_kind_queue, +#ifdef FFRT_IO_TASK_SCHEDULER + ffrt_function_kind_io, +#endif } ffrt_function_kind_t; /** diff --git a/scripts/run_example.sh b/scripts/run_example.sh index bf22f31..0cf69e6 100644 --- a/scripts/run_example.sh +++ b/scripts/run_example.sh @@ -20,7 +20,7 @@ mkdir build && cd build cmake .. \ -DFFRT_EXAMPLE=ON \ - -DFFRT_IO_TASK_SCHEDULER=ON \ + -DFFRT_IO_TASK_SCHEDULER=OFF \ make -j ffrt make -j diff --git a/src/core/task.cpp b/src/core/task.cpp index 8de623c..0acaa5f 100644 --- a/src/core/task.cpp +++ b/src/core/task.cpp @@ -366,7 +366,7 @@ ffrt_qos_t ffrt_get_cur_qos() return qos; } API_ATTRIBUTE((visibility("default"))) -int ffrt_poller_register(int fd, uint32_t events, void* data, void(*cb)(void*, uint32_t)) +int ffrt_poller_register(int fd, uint32_t events, void* data, ffrt_poller_cb cb) { ffrt_qos_t qos = ffrt_get_cur_qos(); return ffrt::PollerProxy::Instance()->GetPoller(qos).AddFdEvent(events, fd, data, cb); @@ -387,7 +387,7 @@ void ffrt_poller_wakeup() } API_ATTRIBUTE((visibility("default"))) -int ffrt_poller_register_timerfunc(int(*timerFunc)()) +int ffrt_poller_register_timerfunc(ffrt_timer_func timerFunc) { ffrt_qos_t qos = ffrt_get_cur_qos(); return ffrt::PollerProxy::Instance()->GetPoller(qos).RegisterTimerFunc(timerFunc); diff --git a/src/queue/serial_looper.cpp b/src/queue/serial_looper.cpp index e9c6312..741726d 100644 --- a/src/queue/serial_looper.cpp +++ b/src/queue/serial_looper.cpp @@ -83,6 +83,8 @@ void SerialLooper::Run() while (!isExit_.load()) { ITask* task = queue_->Next(); if (task) { + FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(), + queue_->GetMapSize()); SetTimeoutMonitor(task); FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr"); QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid); diff --git a/src/queue/serial_looper.h b/src/queue/serial_looper.h index 40ad177..6f17fad 100644 --- a/src/queue/serial_looper.h +++ b/src/queue/serial_looper.h @@ -50,7 +50,7 @@ private: void SetTimeoutMonitor(ITask* task); void RunTimeOutCallback(ITask* task); - std::string name_ = "serial_queue_"; + std::string name_; std::atomic_bool isExit_ = {false}; task_handle handle; std::shared_ptr queue_; diff --git a/src/queue/serial_queue.cpp b/src/queue/serial_queue.cpp index ec723e2..d69a987 100644 --- a/src/queue/serial_queue.cpp +++ b/src/queue/serial_queue.cpp @@ -47,36 +47,40 @@ void SerialQueue::Quit() int SerialQueue::PushTask(ITask* task, uint64_t upTime) { - std::unique_lock lock(mutex_); FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr"); - whenMap_[upTime].emplace_back(task); - if (upTime == whenMap_.begin()->first) { - cond_.notify_all(); + { + std::unique_lock lock(mutex_); + whenMap_[upTime].emplace_back(task); + if (upTime == whenMap_.begin()->first) { + cond_.notify_all(); + } } - FFRT_LOGI("push serial task gid=%llu into qid=%u [%s] succ", task->gid, qid_, name_.c_str()); + FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str()); return 0; } int SerialQueue::RemoveTask(const ITask* task) { - std::unique_lock lock(mutex_); FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr"); - FFRT_LOGI("remove serial task gid=%llu of qid=%u [%s] enter", task->gid, qid_, name_.c_str()); - for (auto it = whenMap_.begin(); it != whenMap_.end();) { - for (auto itList = it->second.begin(); itList != it->second.end();) { - if ((*itList) != task) { - itList++; - continue; + FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str()); + { + std::unique_lock lock(mutex_); + for (auto it = whenMap_.begin(); it != whenMap_.end();) { + for (auto itList = it->second.begin(); itList != it->second.end();) { + if ((*itList) != task) { + itList++; + continue; + } + it->second.erase(itList++); + // a task can be submitted only once through the C interface + return 0; } - it->second.erase(itList++); - // a task can be submitted only once through the C interface - return 0; - } - if (it->second.empty()) { - whenMap_.erase(it++); - } else { - it++; + if (it->second.empty()) { + whenMap_.erase(it++); + } else { + it++; + } } } FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str()); @@ -110,8 +114,7 @@ ITask* SerialQueue::Next() if (it->second.empty()) { (void)whenMap_.erase(it); } - FFRT_LOGI("get next serial task gid=%llu, qid=%u [%s] contains [%u] other timestamps", nextTask->gid, qid_, - name_.c_str(), whenMap_.size()); + mapSize_.store(whenMap_.size()); return nextTask; } else { uint64_t diff = it->first - now; diff --git a/src/queue/serial_queue.h b/src/queue/serial_queue.h index eed0bf5..7761c01 100644 --- a/src/queue/serial_queue.h +++ b/src/queue/serial_queue.h @@ -28,6 +28,11 @@ public: SerialQueue(const uint32_t qid, const std::string& name) : qid_(qid), name_(name) {} ~SerialQueue(); + inline uint32_t GetMapSize() const + { + return mapSize_.load(); + } + ITask* Next(); int PushTask(ITask* task, uint64_t upTime); int RemoveTask(const ITask* task); @@ -39,6 +44,7 @@ private: bool isExit_ = false; const uint32_t qid_; std::string name_; + std::atomic_uint32_t mapSize_ = {0}; std::map> whenMap_; }; } // namespace ffrt diff --git a/src/util/slab.h b/src/util/slab.h index 2efd4f1..703bbba 100644 --- a/src/util/slab.h +++ b/src/util/slab.h @@ -19,14 +19,12 @@ #include #include #include -#include #ifdef FFRT_BBOX_ENABLE #include #endif #include #include "sync/sync.h" #include "dfx/log/ffrt_log_api.h" -#include "dfx/bbox/bbox.h" namespace ffrt { const std::size_t BatchAllocSize = 128 * 1024; @@ -186,13 +184,9 @@ class QSimpleAllocator { std::mutex lock; std::vector cache; uint32_t flags = MAP_ANONYMOUS | MAP_PRIVATE; - uint32_t printCnt = 0; - std::vector mmapedAddrVec; - std::atomic allocatedCnt {0}; bool expand() { - FFRT_LOGE("DEBUG: QSimpleAllocator::expand"); const int prot = PROT_READ | PROT_WRITE; char* p = reinterpret_cast(mmap(nullptr, MmapSz, prot, flags, -1, 0)); if (p == (char*)MAP_FAILED) { @@ -208,24 +202,11 @@ class QSimpleAllocator { for (std::size_t i = 0; i + TSize <= MmapSz; i += TSize) { cache.push_back(reinterpret_cast(p + i)); } - mmapedAddrVec.push_back(reinterpret_cast(p)); return true; } T* alloc() { - FFRT_LOGE("DEBUG: QSimpleAllocator::alloc"); - ++printCnt; - ++allocatedCnt; - if (printCnt % 10 == 0) { - std::string info = SaveTaskCounterInfo(); - FFRT_LOGE("DEBUG: %s", info.c_str()); - for (int i = 0; i < mmapedAddrVec.size(); i++) { - FFRT_LOGE("DEBUG: mmaped addr %d, %llu", i, - static_cast(reinterpret_cast(mmapedAddrVec[i]))); - } - FFRT_LOGE("DEBUG: cache size %llu, used size %llu", cache.size(), allocatedCnt.load()); - } T* p = nullptr; lock.lock(); if (cache.empty()) { @@ -242,8 +223,6 @@ class QSimpleAllocator { void free(T* p) { - --allocatedCnt; - FFRT_LOGE("DEBUG: QSimpleAllocator::free"); lock.lock(); cache.push_back(p); lock.unlock();