!486 control construct sequence of singletons

Merge pull request !486 from 太帅太烦恼/master
This commit is contained in:
openharmony_ci 2024-10-29 03:48:50 +00:00 committed by Gitee
commit 0b9fc4349a
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
15 changed files with 100 additions and 61 deletions

View File

@ -14,6 +14,7 @@
*/
#include "dependence_manager.h"
#include "util/ffrt_facade.h"
#include "util/singleton_register.h"
namespace ffrt {
@ -26,4 +27,19 @@ void DependenceManager::RegistInsCb(SingleInsCB<DependenceManager>::Instance &&c
{
SingletonRegister<DependenceManager>::RegistInsCb(std::move(cb));
}
void DependenceManager::onSubmitUV(ffrt_executor_task_t *task, const task_attr_private *attr)
{
FFRT_EXECUTOR_TASK_SUBMIT_MARKER(task);
FFRT_TRACE_SCOPE(1, onSubmitUV);
QoS qos = ((attr == nullptr || attr->qos_ == qos_inherit) ? QoS() : QoS(attr->qos_));
FFRTTraceRecord::TaskSubmit<ffrt_uv_task>(qos);
LinkedList* node = reinterpret_cast<LinkedList *>(&task->wq);
FFRTScheduler* sch = FFRTFacade::GetSchedInstance();
if (!sch->InsertNode(node, qos)) {
FFRT_LOGE("Submit UV task failed!");
return;
}
FFRTTraceRecord::TaskEnqueue<ffrt_uv_task>(qos);
}
}

View File

@ -82,20 +82,7 @@ public:
virtual void onSubmit(bool has_handle, ffrt_task_handle_t &handle, ffrt_function_header_t *f,
const ffrt_deps_t *ins, const ffrt_deps_t *outs, const task_attr_private *attr) = 0;
void onSubmitUV(ffrt_executor_task_t *task, const task_attr_private *attr)
{
FFRT_EXECUTOR_TASK_SUBMIT_MARKER(task);
FFRT_TRACE_SCOPE(1, onSubmitUV);
QoS qos = ((attr == nullptr || attr->qos_ == qos_inherit) ? QoS() : QoS(attr->qos_));
FFRTTraceRecord::TaskSubmit<ffrt_uv_task>(qos);
LinkedList* node = reinterpret_cast<LinkedList *>(&task->wq);
FFRTScheduler* sch = FFRTScheduler::Instance();
if (!sch->InsertNode(node, qos)) {
FFRT_LOGE("Submit UV task failed!");
return;
}
FFRTTraceRecord::TaskEnqueue<ffrt_uv_task>(qos);
}
void onSubmitUV(ffrt_executor_task_t *task, const task_attr_private *attr);
virtual void onWait() = 0;
#ifdef QOS_DEPENDENCY

View File

@ -28,14 +28,16 @@ namespace ffrt {
SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->criticalMutex_)
{
// control construct sequences of singletons
#ifdef FFRT_OH_TRACE_ENABLE
TraceAdapter::Instance();
#endif
// control construct sequences of singletons
SimpleAllocator<CPUEUTask>::Instance();
SimpleAllocator<SCPUEUTask>::Instance();
SimpleAllocator<QueueTask>::Instance();
SimpleAllocator<VersionCtx>::Instance();
SimpleAllocator<WaitUntilEntry>::Instance();
CoStackAttr::Instance();
PollerProxy::Instance();
FFRTScheduler::Instance();
ExecuteUnit::Instance();
@ -49,6 +51,8 @@ SDependenceManager::SDependenceManager() : criticalMutex_(Entity::Instance()->cr
_StartTrace(HITRACE_TAG_FFRT, "dm_init", -1); // init g_tagsProperty for ohos ffrt trace
_FinishTrace(HITRACE_TAG_FFRT);
#endif
QueueMonitor::GetInstance();
GetIOPoller();
DelayedWorker::GetInstance();
}

View File

@ -38,6 +38,7 @@
#include "dfx/bbox/bbox.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "co_routine_factory.h"
#include "util/ffrt_facade.h"
#ifdef FFRT_TASK_LOCAL_ENABLE
#include "pthread_ffrt.h"
#endif
@ -275,7 +276,7 @@ static void CoSetStackProt(CoRoutine* co, int prot)
static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
{
std::size_t defaultStackSize = CoStackAttr::Instance()->size;
std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
CoRoutine* co = nullptr;
if (likely(stackSize == defaultStackSize)) {
co = ffrt::CoRoutineAllocMem(stackSize);
@ -294,7 +295,7 @@ static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
co->allocatedSize = stackSize;
co->stkMem.size = static_cast<uint64_t>(stackSize - sizeof(CoRoutine) + 8);
co->stkMem.magic = STACK_MAGIC;
if (CoStackAttr::Instance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
CoSetStackProt(co, PROT_READ);
}
co->status.store(static_cast<int>(CoStatus::CO_UNINITIALIZED));
@ -303,10 +304,10 @@ static inline CoRoutine* AllocNewCoRoutine(size_t stackSize)
static inline void CoMemFree(CoRoutine* co)
{
if (CoStackAttr::Instance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
if (FFRTFacade::GetCSAInstance()->type == CoStackProtectType::CO_STACK_STRONG_PROTECT) {
CoSetStackProt(co, PROT_WRITE | PROT_READ);
}
std::size_t defaultStackSize = CoStackAttr::Instance()->size;
std::size_t defaultStackSize = FFRTFacade::GetCSAInstance()->size;
if (likely(co->allocatedSize == defaultStackSize)) {
ffrt::CoRoutineFreeMem(co);
} else {

View File

@ -25,7 +25,7 @@
#include "eu/cpuworker_manager.h"
#include "util/ffrt_facade.h"
#ifdef FFRT_WORKER_MONITOR
#include "util/worker_monitor.h"
#include "util/ffrt_facade.h"
#endif
#ifdef FFRT_WORKERS_DYNAMIC_SCALING
#include "eu/blockaware.h"
@ -72,7 +72,7 @@ bool CPUWorkerManager::IncWorker(const QoS& qos)
FFRT_PERF_WORKER_WAKE(workerQos);
lock.unlock();
#ifdef FFRT_WORKER_MONITOR
WorkerMonitor::GetInstance().SubmitTask();
FFRTFFacade::GetWMInstance().SubmitTask();
#endif
return true;
}

View File

@ -16,7 +16,6 @@
#include <sstream>
#include "dfx/log/ffrt_log_api.h"
#include "dfx/trace_record/ffrt_trace_record.h"
#include "queue_monitor.h"
#include "util/event_handler_adapter.h"
#include "util/ffrt_facade.h"
#include "util/slab.h"
@ -57,7 +56,7 @@ QueueHandler::QueueHandler(const char* name, const ffrt_queue_attr_t* attr, cons
FFRT_LOGW("failed to set [queueId=%u] name due to invalid name or length.", GetQueueId());
}
QueueMonitor::GetInstance().RegisterQueueId(GetQueueId(), this);
FFRTFacade::GetQMInstance().RegisterQueueId(GetQueueId(), this);
FFRT_LOGI("construct %s succ, qos[%d]", name_.c_str(), qos_);
}
@ -66,11 +65,8 @@ QueueHandler::~QueueHandler()
FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot destruct, [queueId=%u] constructed failed", GetQueueId());
FFRT_LOGI("destruct %s enter", name_.c_str());
// clear tasks in queue
queue_->Stop();
while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) || queue_->GetActiveStatus()) {
std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
}
QueueMonitor::GetInstance().ResetQueueStruct(GetQueueId());
CancelAndWait();
FFRTFacade::GetQMInstance().ResetQueueStruct(GetQueueId());
// release callback resource
if (timeout_ > 0) {
@ -167,7 +163,7 @@ void QueueHandler::CancelAndWait()
FFRT_COND_DO_ERR((queue_ == nullptr), return, "cannot cancelAndWait, [queueId=%u] constructed failed",
GetQueueId());
queue_->Remove();
while (QueueMonitor::GetInstance().QueryQueueStatus(GetQueueId()) != 0 || queue_->GetActiveStatus()) {
while (FFRTFacade::GetQMInstance().QueryQueueStatus(GetQueueId()) != 0 || queue_->GetActiveStatus()) {
std::this_thread::sleep_for(std::chrono::microseconds(TASK_DONE_WAIT_UNIT));
}
}
@ -207,7 +203,7 @@ void QueueHandler::Dispatch(QueueTask* inTask)
for (QueueTask* task = inTask; task != nullptr; task = nextTask) {
// dfx watchdog
SetTimeoutMonitor(task);
QueueMonitor::GetInstance().UpdateQueueInfo(GetQueueId(), task->gid);
FFRTFacade::GetQMInstance().UpdateQueueInfo(GetQueueId(), task->gid);
// run user task
FFRT_LOGD("run task [gid=%llu], queueId=%u", task->gid, GetQueueId());
@ -235,7 +231,7 @@ void QueueHandler::Dispatch(QueueTask* inTask)
// run task batch
nextTask = task->GetNextTask();
if (nextTask == nullptr) {
QueueMonitor::GetInstance().ResetQueueInfo(GetQueueId());
FFRTFacade::GetQMInstance().ResetQueueInfo(GetQueueId());
if (!queue_->IsOnLoop()) {
Deliver();
}

View File

@ -28,7 +28,6 @@ constexpr uint32_t TIME_CONVERT_UNIT = 1000;
constexpr uint64_t QUEUE_INFO_INITIAL_CAPACITY = 64;
constexpr uint64_t ALLOW_TIME_ACC_ERROR_US = 500;
constexpr uint64_t MIN_TIMEOUT_THRESHOLD_US = 1000;
constexpr uint64_t DESTRUCT_TRY_COUNT = 100;
inline std::chrono::steady_clock::time_point GetDelayedTimeStamp(uint64_t delayUs)
{
@ -51,24 +50,13 @@ QueueMonitor::QueueMonitor()
return;
}
timeoutUs_ = timeout;
SendDelayedWorker(GetDelayedTimeStamp(timeoutUs_));
FFRT_LOGI("queue monitor ctor leave, watchdog timeout %llu us", timeoutUs_);
}
QueueMonitor::~QueueMonitor()
{
exit_.store(true);
FFRT_LOGI("destruction of QueueMonitor enter");
int tryCnt = DESTRUCT_TRY_COUNT;
// 取消定时器成功或者中断了发送定时器则释放we完成析构
while (!DelayedRemove(we_->tp, we_) && !abortSendTimer_.load()) {
if (--tryCnt < 0) {
break;
}
usleep(MIN_TIMEOUT_THRESHOLD_US);
}
SimpleAllocator<WaitUntilEntry>::FreeMem(we_);
FFRT_LOGI("destruction of QueueMonitor leave");
}
QueueMonitor& QueueMonitor::GetInstance()
@ -128,7 +116,6 @@ void QueueMonitor::UpdateQueueInfo(uint32_t queueId, const uint64_t &taskId)
TimePoint now = std::chrono::steady_clock::now();
queuesRunningInfo_[queueId] = {taskId, now};
if (exit_.exchange(false)) {
abortSendTimer_.store(false);
SendDelayedWorker(now + std::chrono::microseconds(timeoutUs_));
}
}
@ -143,10 +130,6 @@ uint64_t QueueMonitor::QueryQueueStatus(uint32_t queueId)
void QueueMonitor::SendDelayedWorker(TimePoint delay)
{
FFRT_COND_DO_ERR(exit_.load(), abortSendTimer_.store(true);
return;,
"exit_.load() is true");
we_->tp = delay;
we_->cb = ([this](WaitEntry* we_) { CheckQueuesStatus(); });

View File

@ -48,8 +48,7 @@ private:
std::shared_mutex mutex_;
std::vector<std::pair<uint64_t, TimePoint>> queuesRunningInfo_;
std::vector<QueueHandler*> queuesStructInfo_;
std::atomic_bool exit_ { false };
std::atomic_bool abortSendTimer_ { false };
std::atomic_bool exit_ { true };
std::vector<uint64_t> lastReportedTask_;
};
} // namespace ffrt

View File

@ -120,7 +120,6 @@ DelayedWorker::DelayedWorker(): epollfd_ { ::epoll_create1(EPOLL_CLOEXEC) },
FFRT_LOGE("monitor:%d add fail, ret:%d, errno:%d, %s", monitorfd_, ret, errno, strerror(errno));
}
#endif
ThreadInit();
}
DelayedWorker::~DelayedWorker()

View File

@ -36,7 +36,7 @@ class DelayedWorker {
std::atomic_bool toExit = false;
std::unique_ptr<std::thread> delayWorker = nullptr;
int noTaskDelayCount_{0};
bool exited_ = false;
bool exited_ = true;
int epollfd_{-1};
int timerfd_{-1};
#ifdef FFRT_WORKERS_DYNAMIC_SCALING

View File

@ -21,6 +21,7 @@
#include "internal_inc/assert.h"
#include "internal_inc/types.h"
#include "tm/scpu_task.h"
#include "util/ffrt_facade.h"
#include "util/name_manager.h"
namespace ffrt {
@ -172,3 +173,8 @@ void IOPoller::PollOnce(int timeout) noexcept
}
}
}
void ffrt_wait_fd(int fd)
{
ffrt::FFRTFacade::GetIoPPInstance().WaitFdEvent(fd);
}

View File

@ -50,8 +50,5 @@ private:
IOPoller& GetIOPoller() noexcept;
}
static inline void ffrt_wait_fd(int fd)
{
ffrt::GetIOPoller().WaitFdEvent(fd);
}
void ffrt_wait_fd(int fd);
#endif

View File

@ -19,6 +19,7 @@
#include "internal_inc/osal.h"
#include "tm/task_factory.h"
#include "util/ffrt_facade.h"
#include "util/slab.h"
namespace {
@ -43,7 +44,7 @@ void CPUEUTask::SetQos(const QoS& newQos)
void CPUEUTask::FreeMem()
{
BboxCheckAndFreeze();
PollerProxy::Instance().GetPoller(qos).ClearCachedEvents(this);
FFRTFacade::GetPPInstance().GetPoller(qos).ClearCachedEvents(this);
#ifdef FFRT_TASK_LOCAL_ENABLE
TaskTsdDeconstruct(this);
#endif

View File

@ -13,6 +13,7 @@
* limitations under the License.
*/
#include "util/ffrt_facade.h"
#include "dfx/log/ffrt_log_api.h"
namespace {
std::atomic<bool> g_exitFlag { false };
@ -46,6 +47,7 @@ private:
~ProcessExitManager()
{
FFRT_LOGW("ProcessExitManager destruction enter");
std::unique_lock lock(g_exitMtx);
g_exitFlag.store(true);
}

View File

@ -16,10 +16,14 @@
#define UTIL_FFRTFACADE_HPP
#include "tm/cpu_task.h"
#include "sched/scheduler.h"
#include "eu/co_routine.h"
#include "eu/execute_unit.h"
#include "dm/dependence_manager.h"
#include "sync/poller.h"
#include "queue/queue_monitor.h"
#include "sync/delayed_worker.h"
#include "sync/poller.h"
#include "sync/io_poller.h"
#include "util/worker_monitor.h"
namespace ffrt {
bool GetExitFlag();
std::shared_mutex& GetExitMtx();
@ -40,19 +44,43 @@ public:
static inline PollerProxy& GetPPInstance()
{
PollerProxy& inst = Instance().GetPPInstanceImpl();
static PollerProxy& inst = Instance().GetPPInstanceImpl();
return inst;
}
static inline DelayedWorker& GetDWInstance()
{
DelayedWorker& inst = Instance().GetDWInstanceImpl();
static DelayedWorker& inst = Instance().GetDWInstanceImpl();
return inst;
}
static inline FFRTScheduler* GetSchedInstance()
{
FFRTScheduler* inst = Instance().GetSchedInstanceImpl();
static FFRTScheduler* inst = Instance().GetSchedInstanceImpl();
return inst;
}
static inline IOPoller& GetIoPPInstance()
{
static IOPoller& inst = Instance().GetIoPPInstanceImpl();
return inst;
}
static inline CoStackAttr* GetCSAInstance()
{
static CoStackAttr* inst = Instance().GetCSAInstanceImpl();
return inst;
}
static inline QueueMonitor& GetQMInstance()
{
static QueueMonitor& inst = Instance().GetQMInstanceImpl();
return inst;
}
static inline WorkerMonitor& GetWMInstance()
{
static WorkerMonitor& inst = Instance().GetWMInstanceImpl();
return inst;
}
@ -89,6 +117,26 @@ private:
{
return FFRTScheduler::Instance();
}
inline IOPoller& GetIoPPInstanceImpl()
{
return GetIOPoller();
}
inline CoStackAttr* GetCSAInstanceImpl()
{
return CoStackAttr::Instance();
}
inline QueueMonitor& GetQMInstanceImpl()
{
return QueueMonitor::GetInstance();
}
inline WorkerMonitor& GetWMInstanceImpl()
{
return WorkerMonitor::GetInstance();
}
};
} // namespace FFRT