deal conflicts

Signed-off-by: wangyulie <wanglieyu@126.com>
This commit is contained in:
wangyulie 2023-10-30 10:32:31 +08:00
commit 389ec5c52c
11 changed files with 67 additions and 52 deletions

View File

@ -18,7 +18,7 @@ option(FFRT_BENCHMARKS "Enables ffrt Benchmarks" OFF)
option(FFRT_TEST_ENABLE "Enables ffrt test" OFF)
option(FFRT_CLANG_COMPILE "use clang/clang++ for compiling" ON)
option(FFRT_SANITIZE "enable address or thread sanitizer" OFF)
option(FFRT_IO_TASK_SCHEDULER "enalbe io task scheduler" ON)
option(FFRT_IO_TASK_SCHEDULER "enalbe io task scheduler" OFF)
# set compiler clang or gcc, must before project(ffrt)
if(FFRT_CLANG_COMPILE STREQUAL ON)

View File

@ -33,13 +33,15 @@ typedef void (*ffrt_executor_task_func)(ffrt_executor_task_t* data, ffrt_qos_t q
FFRT_C_API void ffrt_executor_task_register_func(ffrt_executor_task_func func, ffrt_executor_task_type_t type);
FFRT_C_API void ffrt_executor_task_submit(ffrt_executor_task_t *task, const ffrt_task_attr_t *attr);
FFRT_C_API int ffrt_executor_task_cancel(ffrt_executor_task_t *taask, const ffrt_qos_t qos);
FFRT_C_API int ffrt_executor_task_cancel(ffrt_executor_task_t *task, const ffrt_qos_t qos);
#ifdef FFRT_IO_TASK_SCHEDULER
// poller
FFRT_C_API int ffrt_poller_register(int fd, uint32_t events, void* data, void(*cb)(void*, uint32_t));
typedef void (*ffrt_poller_cb)(void*, uint32_t);
typedef int (*ffrt_timer_func)();
FFRT_C_API int ffrt_poller_register(int fd, uint32_t events, void* data, ffrt_poller_cb cb);
FFRT_C_API int ffrt_poller_deregister(int fd);
FFRT_C_API int ffrt_poller_register_timerfunc(int(*timerFunc)());
FFRT_C_API int ffrt_poller_register_timerfunc(ffrt_timer_func timerFunc);
FFRT_C_API void ffrt_poller_wakeup();
FFRT_C_API void ffrt_submit_coroutine(void* co, ffrt_coroutine_ptr_t exec,

View File

@ -82,6 +82,26 @@ typedef void* ffrt_sys_event_handle_t;
typedef void* ffrt_config_t;
#ifdef FFRT_IO_TASK_SCHEDULER
typedef enum {
ffrt_coroutine_stackless,
ffrt_coroutine_with_stack,
} ffrt_coroutine_t;
typedef enum {
ffrt_coroutine_pending = 0,
ffrt_coroutine_ready = 0,
} ffrt_coroutine_ret_t;
typedef ffrt_coroutine_ret_t(*ffrt_coroutine_ptr_t)(void*);
typedef struct {
int fd;
void* data;
void(*cb)(void*, uint32_t);
} ffrt_poller_t;
#endif
#ifdef __cplusplus
namespace ffrt {
enum qos_inner_default {

View File

@ -103,7 +103,10 @@ typedef enum {
/** General task. */
ffrt_function_kind_general,
/** Queue task. */
ffrt_function_kind_queue
ffrt_function_kind_queue,
#ifdef FFRT_IO_TASK_SCHEDULER
ffrt_function_kind_io,
#endif
} ffrt_function_kind_t;
/**

View File

@ -20,7 +20,7 @@ mkdir build && cd build
cmake .. \
-DFFRT_EXAMPLE=ON \
-DFFRT_IO_TASK_SCHEDULER=ON \
-DFFRT_IO_TASK_SCHEDULER=OFF \
make -j ffrt
make -j

View File

@ -366,7 +366,7 @@ ffrt_qos_t ffrt_get_cur_qos()
return qos;
}
API_ATTRIBUTE((visibility("default")))
int ffrt_poller_register(int fd, uint32_t events, void* data, void(*cb)(void*, uint32_t))
int ffrt_poller_register(int fd, uint32_t events, void* data, ffrt_poller_cb cb)
{
ffrt_qos_t qos = ffrt_get_cur_qos();
return ffrt::PollerProxy::Instance()->GetPoller(qos).AddFdEvent(events, fd, data, cb);
@ -387,7 +387,7 @@ void ffrt_poller_wakeup()
}
API_ATTRIBUTE((visibility("default")))
int ffrt_poller_register_timerfunc(int(*timerFunc)())
int ffrt_poller_register_timerfunc(ffrt_timer_func timerFunc)
{
ffrt_qos_t qos = ffrt_get_cur_qos();
return ffrt::PollerProxy::Instance()->GetPoller(qos).RegisterTimerFunc(timerFunc);

View File

@ -83,6 +83,8 @@ void SerialLooper::Run()
while (!isExit_.load()) {
ITask* task = queue_->Next();
if (task) {
FFRT_LOGI("pick task gid=%llu, qid=%u [%s] remains [%u]", task->gid, qid_, name_.c_str(),
queue_->GetMapSize());
SetTimeoutMonitor(task);
FFRT_COND_DO_ERR((task->handler_ == nullptr), break, "failed to run task, handler is nullptr");
QueueMonitor::GetInstance().UpdateQueueInfo(qid_, task->gid);

View File

@ -50,7 +50,7 @@ private:
void SetTimeoutMonitor(ITask* task);
void RunTimeOutCallback(ITask* task);
std::string name_ = "serial_queue_";
std::string name_;
std::atomic_bool isExit_ = {false};
task_handle handle;
std::shared_ptr<SerialQueue> queue_;

View File

@ -47,36 +47,40 @@ void SerialQueue::Quit()
int SerialQueue::PushTask(ITask* task, uint64_t upTime)
{
std::unique_lock lock(mutex_);
FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to push task, task is nullptr");
whenMap_[upTime].emplace_back(task);
if (upTime == whenMap_.begin()->first) {
cond_.notify_all();
{
std::unique_lock lock(mutex_);
whenMap_[upTime].emplace_back(task);
if (upTime == whenMap_.begin()->first) {
cond_.notify_all();
}
}
FFRT_LOGI("push serial task gid=%llu into qid=%u [%s] succ", task->gid, qid_, name_.c_str());
FFRT_LOGI("push task gid=%llu to qid=%u [%s]", task->gid, qid_, name_.c_str());
return 0;
}
int SerialQueue::RemoveTask(const ITask* task)
{
std::unique_lock lock(mutex_);
FFRT_COND_DO_ERR((task == nullptr), return -1, "failed to remove task, task is nullptr");
FFRT_LOGI("remove serial task gid=%llu of qid=%u [%s] enter", task->gid, qid_, name_.c_str());
for (auto it = whenMap_.begin(); it != whenMap_.end();) {
for (auto itList = it->second.begin(); itList != it->second.end();) {
if ((*itList) != task) {
itList++;
continue;
FFRT_LOGI("cancel task gid=%llu of qid=%u [%s]", task->gid, qid_, name_.c_str());
{
std::unique_lock lock(mutex_);
for (auto it = whenMap_.begin(); it != whenMap_.end();) {
for (auto itList = it->second.begin(); itList != it->second.end();) {
if ((*itList) != task) {
itList++;
continue;
}
it->second.erase(itList++);
// a task can be submitted only once through the C interface
return 0;
}
it->second.erase(itList++);
// a task can be submitted only once through the C interface
return 0;
}
if (it->second.empty()) {
whenMap_.erase(it++);
} else {
it++;
if (it->second.empty()) {
whenMap_.erase(it++);
} else {
it++;
}
}
}
FFRT_LOGD("remove serial task gid=%llu of [%s] failed, task not waiting in queue", task->gid, name_.c_str());
@ -110,8 +114,7 @@ ITask* SerialQueue::Next()
if (it->second.empty()) {
(void)whenMap_.erase(it);
}
FFRT_LOGI("get next serial task gid=%llu, qid=%u [%s] contains [%u] other timestamps", nextTask->gid, qid_,
name_.c_str(), whenMap_.size());
mapSize_.store(whenMap_.size());
return nextTask;
} else {
uint64_t diff = it->first - now;

View File

@ -28,6 +28,11 @@ public:
SerialQueue(const uint32_t qid, const std::string& name) : qid_(qid), name_(name) {}
~SerialQueue();
inline uint32_t GetMapSize() const
{
return mapSize_.load();
}
ITask* Next();
int PushTask(ITask* task, uint64_t upTime);
int RemoveTask(const ITask* task);
@ -39,6 +44,7 @@ private:
bool isExit_ = false;
const uint32_t qid_;
std::string name_;
std::atomic_uint32_t mapSize_ = {0};
std::map<uint64_t, std::list<ITask*>> whenMap_;
};
} // namespace ffrt

View File

@ -19,14 +19,12 @@
#include <new>
#include <vector>
#include <mutex>
#include <atomic>
#ifdef FFRT_BBOX_ENABLE
#include <unordered_set>
#endif
#include <sys/mman.h>
#include "sync/sync.h"
#include "dfx/log/ffrt_log_api.h"
#include "dfx/bbox/bbox.h"
namespace ffrt {
const std::size_t BatchAllocSize = 128 * 1024;
@ -186,13 +184,9 @@ class QSimpleAllocator {
std::mutex lock;
std::vector<T*> cache;
uint32_t flags = MAP_ANONYMOUS | MAP_PRIVATE;
uint32_t printCnt = 0;
std::vector<void*> mmapedAddrVec;
std::atomic<uint64_t> allocatedCnt {0};
bool expand()
{
FFRT_LOGE("DEBUG: QSimpleAllocator::expand");
const int prot = PROT_READ | PROT_WRITE;
char* p = reinterpret_cast<char*>(mmap(nullptr, MmapSz, prot, flags, -1, 0));
if (p == (char*)MAP_FAILED) {
@ -208,24 +202,11 @@ class QSimpleAllocator {
for (std::size_t i = 0; i + TSize <= MmapSz; i += TSize) {
cache.push_back(reinterpret_cast<T*>(p + i));
}
mmapedAddrVec.push_back(reinterpret_cast<void*>(p));
return true;
}
T* alloc()
{
FFRT_LOGE("DEBUG: QSimpleAllocator::alloc");
++printCnt;
++allocatedCnt;
if (printCnt % 10 == 0) {
std::string info = SaveTaskCounterInfo();
FFRT_LOGE("DEBUG: %s", info.c_str());
for (int i = 0; i < mmapedAddrVec.size(); i++) {
FFRT_LOGE("DEBUG: mmaped addr %d, %llu", i,
static_cast<uint64_t>(reinterpret_cast<uintptr_t>(mmapedAddrVec[i])));
}
FFRT_LOGE("DEBUG: cache size %llu, used size %llu", cache.size(), allocatedCnt.load());
}
T* p = nullptr;
lock.lock();
if (cache.empty()) {
@ -242,8 +223,6 @@ class QSimpleAllocator {
void free(T* p)
{
--allocatedCnt;
FFRT_LOGE("DEBUG: QSimpleAllocator::free");
lock.lock();
cache.push_back(p);
lock.unlock();