!2566 [Concurrency] Taskpool part 4

Merge pull request !2566 from Petrov Igor/taskpool_part_4
This commit is contained in:
openharmony_ci 2024-11-01 14:39:19 +00:00 committed by Gitee
commit d0f0cdba65
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
20 changed files with 1443 additions and 1038 deletions

View File

@ -80,7 +80,6 @@ static_core/plugins/ets/isa/ @mbolshov
static_core/plugins/ets/libllvmbackend/ @Prof1983 @romanzhuykov
static_core/plugins/ets/runtime/ @semenovaleksandr
static_core/plugins/ets/runtime/*coroutine* @dmitriitr @konstanting
static_core/plugins/ets/runtime/*taskpool* @dmitriitr @konstanting @ipetrov
static_core/plugins/ets/runtime/ets_entrypoints* @Prof1983 @mbolshov
static_core/plugins/ets/runtime/ets_handle* @dmitriitr @udav
static_core/plugins/ets/runtime/ets_libbase_runtime.yaml @Prof1983

View File

@ -25,7 +25,6 @@ set(ETS_RUNTIME_SOURCES
${ETS_EXT_SOURCES}/ets_runtime_interface.cpp
${ETS_EXT_SOURCES}/ets_vm.cpp
${ETS_EXT_SOURCES}/ets_stubs.cpp
${ETS_EXT_SOURCES}/ets_taskpool.cpp
${ETS_EXT_SOURCES}/intrinsics/std_core.cpp
${ETS_EXT_SOURCES}/ets_itable_builder.cpp
${ETS_EXT_SOURCES}/ets_vtable_builder.cpp

View File

@ -1394,77 +1394,7 @@ intrinsics:
signature:
ret: i64
args: []
impl: ark::ets::intrinsics::GenerateTaskId
- name: TaskpoolTaskSubmitted
space: ets
class_name: escompat.taskpool
method_name: taskSubmitted
static: true
signature:
ret: void
args: [i64]
impl: ark::ets::intrinsics::TaskpoolTaskSubmitted
- name: TaskpoolGroupSubmitted
space: ets
class_name: escompat.taskpool
method_name: taskGroupSubmitted
static: true
signature:
ret: void
args: [i64, i64]
impl: ark::ets::intrinsics::TaskpoolGroupSubmitted
- name: TaskpoolTaskStarted
space: ets
class_name: escompat.taskpool
method_name: taskStarted
static: true
signature:
ret: u1
args: [i64, i64]
impl: ark::ets::intrinsics::TaskpoolTaskStarted
- name: TaskpoolTaskFinished
space: ets
class_name: escompat.taskpool
method_name: taskFinished
static: true
signature:
ret: u1
args: [i64, i64]
impl: ark::ets::intrinsics::TaskpoolTaskFinished
- name: TaskpoolCancelTask
space: ets
class_name: escompat.taskpool
method_name: cancelImpl
static: true
signature:
ret: void
args: [i64, i64]
impl: ark::ets::intrinsics::TaskpoolCancelTask
- name: TaskpoolCancelGroup
space: ets
class_name: escompat.taskpool
method_name: cancelGroupImpl
static: true
signature:
ret: void
args: [i64]
impl: ark::ets::intrinsics::TaskpoolCancelGroup
- name: TaskpoolIsTaskCancel
space: ets
class_name: escompat.taskpoolTask
method_name: isCancel
static: true
signature:
ret: u1
args: []
impl: ark::ets::intrinsics::TaskpoolIsTaskCanceled
impl: ark::ets::intrinsics::taskpool::GenerateTaskId
- name: TaskpoolGenerateGroupId
space: ets
@ -1474,7 +1404,7 @@ intrinsics:
signature:
ret: i64
args: []
impl: ark::ets::intrinsics::GenerateGroupId
impl: ark::ets::intrinsics::taskpool::GenerateGroupId
- name: TaskpoolGenerateSequenceRunnerId
space: ets
@ -1484,7 +1414,7 @@ intrinsics:
signature:
ret: i64
args: []
impl: ark::ets::intrinsics::GenerateSeqRunnerId
impl: ark::ets::intrinsics::taskpool::GenerateSeqRunnerId
###################
# std.time.Chrono #
@ -1500,6 +1430,17 @@ intrinsics:
args: []
impl: ark::intrinsics::NanoTime
clear_flags: [ require_state, runtime_call, can_throw ]
- name: StdTimeChronoGetCpuTime
space: ets
class_name: std.time.Chrono
method_name: getCpuTime
static: true
signature:
ret: i64
args: []
impl: ark::ets::intrinsics::ChronoGetCpuTime
clear_flags: [ require_state, runtime_call, can_throw ]
##################
# std.core.Float #
@ -4877,3 +4818,107 @@ intrinsics:
- i64
- std.core.Object
impl: ark::ets::intrinsics::DebuggerAPISetLocalObject
###############################
# escompat.ConcurrencyHelpers #
###############################
- name: escompatConcurrencyHelpersMutexCreate
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: mutexCreate
static: true
signature:
ret: std.core.Object
args: []
impl: ark::ets::intrinsics::EtsMutexCreate
- name: escompatConcurrencyHelpersMutexLock
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: mutexLock
static: true
signature:
ret: void
args: [ std.core.Object ]
impl: ark::ets::intrinsics::EtsMutexLock
- name: escompatConcurrencyHelpersMutexUnlock
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: mutexUnlock
static: true
signature:
ret: void
args: [ std.core.Object ]
impl: ark::ets::intrinsics::EtsMutexUnlock
- name: escompatConcurrencyHelpersEventCreate
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: eventCreate
static: true
signature:
ret: std.core.Object
args: []
impl: ark::ets::intrinsics::EtsEventCreate
- name: escompatConcurrencyHelpersEventWait
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: eventWait
static: true
signature:
ret: void
args: [ std.core.Object ]
impl: ark::ets::intrinsics::EtsEventWait
- name: escompatConcurrencyHelpersEventFire
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: eventFire
static: true
signature:
ret: void
args: [ std.core.Object ]
impl: ark::ets::intrinsics::EtsEventFire
- name: escompatConcurrencyHelpersCondVarCreate
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: condVarCreate
static: true
signature:
ret: std.core.Object
args: []
impl: ark::ets::intrinsics::EtsCondVarCreate
- name: escompatConcurrencyHelpersCondVarWait
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: condVarWait
static: true
signature:
ret: void
args: [ std.core.Object, std.core.Object ]
impl: ark::ets::intrinsics::EtsCondVarWait
- name: escompatConcurrencyHelpersCondVarNotifyOne
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: condVarNotifyOne
static: true
signature:
ret: void
args: [ std.core.Object, std.core.Object ]
impl: ark::ets::intrinsics::EtsCondVarNotifyOne
- name: escompatConcurrencyHelpersCondVarNotifyAll
space: ets
class_name: escompat.ConcurrencyHelpers
method_name: condVarNotifyAll
static: true
signature:
ret: void
args: [ std.core.Object, std.core.Object ]
impl: ark::ets::intrinsics::EtsCondVarNotifyAll

View File

@ -1,152 +0,0 @@
/**
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "plugins/ets/runtime/ets_taskpool.h"
namespace ark::ets {
Taskpool::Taskpool() : taskId_(1), taskGroupId_(1) {}
EtsLong Taskpool::GenerateTaskId()
{
return taskId_++;
}
EtsLong Taskpool::GenerateTaskGroupId()
{
return taskGroupId_++;
}
EtsLong Taskpool::GenerateSeqRunnerId()
{
return seqRunnerId_++;
}
void Taskpool::TaskSubmitted(EtsLong taskId)
{
os::memory::LockHolder lh(taskpoolLock_);
waitingTasks_[taskId]++;
}
void Taskpool::GroupSubmitted(EtsLong groupId, size_t tasksCount)
{
ASSERT(tasksCount != 0);
os::memory::LockHolder lh(taskpoolLock_);
waitingGroupTasks_[groupId] += tasksCount;
}
size_t Taskpool::DecrementTaskCounter(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &tasks)
{
auto taskIter = tasks.find(id);
ASSERT(taskIter != tasks.end());
auto tasksCount = --(taskIter->second);
if (tasksCount == 0) {
tasks.erase(taskIter);
}
return tasksCount;
}
bool Taskpool::MoveTaskFromWaitingToRunning(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &waitingTasks,
PandaUnorderedMap<EtsLong, size_t> &runningTasks,
PandaUnorderedSet<EtsLong> &idsToBeCanceled)
{
auto waitingTasksAfterDecrementation = DecrementTaskCounter(id, waitingTasks);
if (idsToBeCanceled.find(id) != idsToBeCanceled.end()) {
if ((waitingTasksAfterDecrementation == 0) && (runningTasks.find(id) == runningTasks.end())) {
idsToBeCanceled.erase(id);
}
return false;
}
runningTasks[id]++;
return true;
}
bool Taskpool::TaskStarted(uint32_t coroutineId, EtsLong taskId, EtsLong groupId)
{
os::memory::LockHolder lh(taskpoolLock_);
auto isNotCanceled =
(groupId == 0) // 0 means task is not group task
? MoveTaskFromWaitingToRunning(taskId, waitingTasks_, runningTasks_, tasksToBeCanceled_)
: MoveTaskFromWaitingToRunning(groupId, waitingGroupTasks_, runningGroupTasks_, groupsToBeCanceled_);
if (isNotCanceled) {
executingTasks_.insert({coroutineId, {taskId, groupId}});
return true;
}
return false;
}
bool Taskpool::RemoveTaskFromRunning(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &runningTasks,
const PandaUnorderedMap<EtsLong, size_t> &waitingTasks,
PandaUnorderedSet<EtsLong> &idsToBeCanceled)
{
auto runningTasksAfterDecrementation = DecrementTaskCounter(id, runningTasks);
if (idsToBeCanceled.find(id) != idsToBeCanceled.end()) {
if ((runningTasksAfterDecrementation == 0) && (waitingTasks.find(id) == waitingTasks.end())) {
idsToBeCanceled.erase(id);
}
return false;
}
return true;
}
bool Taskpool::TaskFinished(uint32_t coroutineId, EtsLong taskId, EtsLong groupId)
{
os::memory::LockHolder lh(taskpoolLock_);
ASSERT(executingTasks_[coroutineId].taskId == taskId);
executingTasks_.erase(coroutineId);
return (groupId == 0) ? RemoveTaskFromRunning(taskId, runningTasks_, waitingTasks_, tasksToBeCanceled_)
: RemoveTaskFromRunning(groupId, runningGroupTasks_, waitingGroupTasks_, groupsToBeCanceled_);
}
bool Taskpool::CancelTask(EtsLong taskId)
{
os::memory::LockHolder lh(taskpoolLock_);
if ((waitingTasks_.find(taskId) == waitingTasks_.end()) && (runningTasks_.find(taskId) == runningTasks_.end())) {
// No tasks with this id in the taskpool, escompat.Error will be occurred by taskpool.cancel
return false;
}
tasksToBeCanceled_.insert(taskId);
return true;
}
bool Taskpool::CancelGroup(EtsLong groupId)
{
os::memory::LockHolder lh(taskpoolLock_);
if ((waitingGroupTasks_.find(groupId) == waitingGroupTasks_.end()) &&
(runningGroupTasks_.find(groupId) == runningTasks_.end())) {
// No tasks of the group with this id in the taskpool
return false;
}
groupsToBeCanceled_.insert(groupId);
return true;
}
bool Taskpool::IsTaskCanceled(uint32_t coroutineId) const
{
os::memory::LockHolder lh(taskpoolLock_);
auto it = executingTasks_.find(coroutineId);
if (it == executingTasks_.end()) {
// Current coroutine is not executing a task
return false;
}
// 0 means non-group tasks
if (it->second.groupId == 0) {
return tasksToBeCanceled_.find(it->second.taskId) != tasksToBeCanceled_.end();
}
// The executing task is group task, so check the group status
return groupsToBeCanceled_.find(it->second.groupId) != groupsToBeCanceled_.end();
}
} // namespace ark::ets

View File

@ -1,168 +0,0 @@
/**
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PANDA_PLUGINS_ETS_RUNTIME_ETS_TASKPOOL_H
#define PANDA_PLUGINS_ETS_RUNTIME_ETS_TASKPOOL_H
#include "libpandabase/os/mutex.h"
#include "plugins/ets/runtime/types/ets_primitives.h"
#include "runtime/include/mem/panda_containers.h"
namespace ark::ets {
/// @class Taskpool contains information about each common task passed to execution until the task will not be finished
class Taskpool final {
public:
NO_COPY_SEMANTIC(Taskpool);
NO_MOVE_SEMANTIC(Taskpool);
Taskpool();
~Taskpool() = default;
/**
* @see taskpool.Task.constructor
* @return new unique identifier for creating task
*/
EtsLong GenerateTaskId();
/**
* @see taskpool.TaskGroup.constructor
* @return new unique identifier for creating group
*/
EtsLong GenerateTaskGroupId();
/**
* @see taskpool.SequenceRunner.constructor
* @return new unique identifier for creating task sequence runner
*/
EtsLong GenerateSeqRunnerId();
/**
* @brief Notify taskpool about execution request for task
* @param taskId requested task identifier
*/
void TaskSubmitted(EtsLong taskId);
/**
* @brief Notify taskpool about execution request for group of tasks
* @param groupId requested group identifier
* @param tasksCount count of tasks in the passed group
*/
void GroupSubmitted(EtsLong groupId, size_t tasksCount);
/**
* @brief Notify taskpool that task is starting execution on a coroutine
* @param coroutineId identifier of executing coroutine for requested task
* @param taskId requsted task identifier
* @param groupId group identifier of this task (0 means task is not group task)
* @return true if task can be started, false - if task was cancled
*
* @see CancelTask
*/
bool TaskStarted(uint32_t coroutineId, EtsLong taskId, EtsLong groupId);
/**
* @brief Notify taskpool that task is ending execution
* @param coroutineId identifier of executing coroutine for requested task
* @param taskId requsted task identifier
* @param groupId group identifier of this task (0 means task is not group task)
* @return true if task can be successfully finished, false - if task was cancled
*
* @see CancelTask
*/
bool TaskFinished(uint32_t coroutineId, EtsLong taskId, EtsLong groupId);
/**
* @brief Try to mark task as cancel. Only waiting or running tasks are allowed to canceling
* @param taskId identifier of task for canceling
* @return true if task was marked as caneling, false - if task is not executed or finished
*/
bool CancelTask(EtsLong taskId);
/**
* @brief Try to mark group as cancel. Only waiting or running groups are allowed to canceling
* @param groupId identifier of task for canceling
* @return true if the group was marked as caneling, false - if group is not executed or finished
*/
bool CancelGroup(EtsLong groupId);
/**
* @param coroutineId corotine id with potentially executing task
* @return true if coroutine with coroutineId is executing a task and this task is marked as canceling, false -
* otherwise
*/
bool IsTaskCanceled(uint32_t coroutineId) const;
private:
/**
* @brief Decrement count of tasks with associated identifier from passed collection. If count of task after
* decrementing == 0, then remove this identifier from the collection
* @param id associated identifier of decremented task counter, it can be task id or group id
* @param tasks map collection, key is unique identifier, value is counter of tasks with the associated identifier
* @return count tasks with associated identifier in passed collection after decrementing
*/
size_t DecrementTaskCounter(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &tasks) REQUIRES(taskpoolLock_);
/**
* @brief Decrement count of tasks with associated identifier in waiting tasks collection and if the id is not
* marked as cancled then move to running tasks collection. If id is marked as canceled and no more tasks with the
* associated id in the taskpool then clear the id from set of canceled identifiers
* @param id associated identifier of decremented task counter, it can be task id or group id
* @param waitingTasks map collection of waiting tasks, key is unique identifier, value is counter of tasks
* @param runningTasks map collection of running tasks, key is unique identifier, value is counter of tasks
* @param idsToBeCanceled set of identifiers marked as canceled
* @return true if task is moved from waiting map to running map, false - if id is marked as canceled
*/
bool MoveTaskFromWaitingToRunning(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &waitingTasks,
PandaUnorderedMap<EtsLong, size_t> &runningTasks,
PandaUnorderedSet<EtsLong> &idsToBeCanceled) REQUIRES(taskpoolLock_);
/**
* @brief Decrement count of tasks with associated identifier in running tasks. If id is marked as canceled and no
* more tasks with the associated id in the taskpool then clear the id from set of canceled identifiers
* @param id associated identifier of decremented task counter, it can be task id or group id
* @param waitingTasks map collection of waiting tasks, key is unique identifier, value is counter of tasks.
* @param runningTasks map collection of running tasks, key is unique identifier, value is counter of tasks
* @param idsToBeCanceled set of identifiers marked as canceled
* @return true if task is moved from waiting map to running map, false - if id is marked as canceled
*/
bool RemoveTaskFromRunning(EtsLong id, PandaUnorderedMap<EtsLong, size_t> &runningTasks,
const PandaUnorderedMap<EtsLong, size_t> &waitingTasks,
PandaUnorderedSet<EtsLong> &idsToBeCanceled) REQUIRES(taskpoolLock_);
struct ExecutingTaskInfo {
EtsLong taskId;
EtsLong groupId;
};
std::atomic<EtsLong> taskId_ {1};
std::atomic<EtsLong> taskGroupId_ {1};
std::atomic<EtsLong> seqRunnerId_ {1};
mutable os::memory::Mutex taskpoolLock_;
// key is task id, value is count of associated tasks
PandaUnorderedMap<EtsLong, size_t> waitingTasks_ GUARDED_BY(taskpoolLock_);
PandaUnorderedMap<EtsLong, size_t> runningTasks_ GUARDED_BY(taskpoolLock_);
PandaUnorderedSet<EtsLong> tasksToBeCanceled_ GUARDED_BY(taskpoolLock_);
// key is group id, value is count of associated tasks
PandaUnorderedMap<EtsLong, size_t> waitingGroupTasks_ GUARDED_BY(taskpoolLock_);
PandaUnorderedMap<EtsLong, size_t> runningGroupTasks_ GUARDED_BY(taskpoolLock_);
PandaUnorderedSet<EtsLong> groupsToBeCanceled_ GUARDED_BY(taskpoolLock_);
// key is coroutine id, value is task information
PandaUnorderedMap<uint32_t, ExecutingTaskInfo> executingTasks_ GUARDED_BY(taskpoolLock_);
};
} // namespace ark::ets
#endif // PANDA_PLUGINS_ETS_RUNTIME_ETS_TASKPOOL_H

View File

@ -173,7 +173,6 @@ PandaEtsVM::PandaEtsVM(Runtime *runtime, const RuntimeOptions &options, mem::Mem
coroutineManager_ = allocator->New<ThreadedCoroutineManager>(EtsCoroutine::Create<Coroutine>);
}
rendezvous_ = allocator->New<Rendezvous>(this);
taskpool_ = allocator->New<Taskpool>();
InitializeRandomEngine();
}
@ -183,7 +182,6 @@ PandaEtsVM::~PandaEtsVM()
auto allocator = mm_->GetHeapManager()->GetInternalAllocator();
ASSERT(allocator != nullptr);
allocator->Delete(taskpool_);
allocator->Delete(rendezvous_);
allocator->Delete(runtimeIface_);
allocator->Delete(coroutineManager_);

View File

@ -60,7 +60,6 @@
#include "plugins/ets/runtime/job_queue.h"
#include "plugins/ets/runtime/ets_handle_scope.h"
#include "plugins/ets/runtime/ets_handle.h"
#include "plugins/ets/runtime/ets_taskpool.h"
namespace ark::ets {
@ -272,16 +271,6 @@ public:
return jobQueue_.get();
}
Taskpool *GetTaskpool()
{
return taskpool_;
}
const Taskpool *GetTaskpool() const
{
return taskpool_;
}
void InitJobQueue(JobQueue *jobQueue)
{
ASSERT(jobQueue_ == nullptr);
@ -421,7 +410,6 @@ private:
os::memory::Mutex finalizationRegistryLock_;
PandaList<EtsObject *> registeredFinalizationRegistryInstances_ GUARDED_BY(finalizationRegistryLock_);
PandaUniquePtr<JobQueue> jobQueue_;
Taskpool *taskpool_ {nullptr};
PandaUniquePtr<CallbackPosterFactoryIface> callbackPosterFactory_;
// optional for lazy initialization
std::optional<std::mt19937> randomEngine_;

View File

@ -75,4 +75,10 @@ extern "C" EtsString *EscompatDateGetLocaleString(EtsString *format, EtsString *
return EtsString::CreateFromMUtf8(ss.str().c_str());
}
extern "C" int64_t ChronoGetCpuTime()
{
// NOTE(ipetrov, #15499): Need to change approach when coroutine can migrate to other thread
return ark::os::time::GetClockTimeInThreadCpuTime();
}
} // namespace ark::ets::intrinsics

View File

@ -13,90 +13,29 @@
* limitations under the License.
*/
#include "plugins/ets/runtime/ets_vm.h"
#include "plugins/ets/runtime/ets_exceptions.h"
#include <atomic>
#include "libpandabase/os/time.h"
#include "plugins/ets/runtime/types/ets_primitives.h"
namespace ark::ets::intrinsics {
namespace ark::ets::intrinsics::taskpool {
static std::atomic<EtsLong> g_taskId = 1;
static std::atomic<EtsLong> g_taskGroupId = 1;
static std::atomic<EtsLong> g_seqRunnerId = 1;
extern "C" EtsLong GenerateTaskId()
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return coroutine->GetPandaVM()->GetTaskpool()->GenerateTaskId();
return g_taskId++;
}
extern "C" EtsLong GenerateGroupId()
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return coroutine->GetPandaVM()->GetTaskpool()->GenerateTaskGroupId();
return g_taskGroupId++;
}
extern "C" EtsLong GenerateSeqRunnerId()
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return coroutine->GetPandaVM()->GetTaskpool()->GenerateSeqRunnerId();
return g_seqRunnerId++;
}
extern "C" void TaskpoolTaskSubmitted(EtsLong taskId)
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
coroutine->GetPandaVM()->GetTaskpool()->TaskSubmitted(taskId);
}
extern "C" void TaskpoolGroupSubmitted(EtsLong groupId, EtsLong tasksCount)
{
ASSERT(tasksCount > 0);
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
coroutine->GetPandaVM()->GetTaskpool()->GroupSubmitted(groupId, static_cast<size_t>(tasksCount));
}
extern "C" EtsBoolean TaskpoolTaskStarted(EtsLong taskId, EtsLong groupId)
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return EtsBoolean(
coroutine->GetPandaVM()->GetTaskpool()->TaskStarted(coroutine->GetCoroutineId(), taskId, groupId));
}
extern "C" EtsBoolean TaskpoolTaskFinished(EtsLong taskId, EtsLong groupId)
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return EtsBoolean(
coroutine->GetPandaVM()->GetTaskpool()->TaskFinished(coroutine->GetCoroutineId(), taskId, groupId));
}
extern "C" void TaskpoolCancelTask(EtsLong taskId, EtsLong seqId)
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
if (!coroutine->GetPandaVM()->GetTaskpool()->CancelTask(taskId)) {
// seqId = 0 means task was not added to a task sequence runner
const char *messageError = (seqId == 0) ? "taskpool:: task is not executed or has been executed"
: "taskpool:: sequenceRunner task has been executed";
ThrowEtsException(coroutine, panda_file_items::class_descriptors::ERROR, messageError);
}
}
extern "C" void TaskpoolCancelGroup(EtsLong groupId)
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
if (!coroutine->GetPandaVM()->GetTaskpool()->CancelGroup(groupId)) {
ThrowEtsException(coroutine, panda_file_items::class_descriptors::ERROR,
"taskpool:: taskGroup is not executed or has been executed");
}
}
extern "C" EtsBoolean TaskpoolIsTaskCanceled()
{
EtsCoroutine *coroutine = EtsCoroutine::GetCurrent();
ASSERT(coroutine != nullptr);
return EtsBoolean(coroutine->GetPandaVM()->GetTaskpool()->IsTaskCanceled(coroutine->GetCoroutineId()));
}
} // namespace ark::ets::intrinsics
} // namespace ark::ets::intrinsics::taskpool

View File

@ -0,0 +1,46 @@
/**
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package escompat;
/**
* @class provides api for sync primitives (see std/core/SyncPrimitives.sts) in escompat
*/
final class ConcurrencyHelpers {
// Mutex api
public static native mutexCreate(): Object;
public static native mutexLock(mutex: Object): void;
public static native mutexUnlock(mutex: Object): void;
public static lockGuard(mutex: Object, callback: () => void): void {
ConcurrencyHelpers.mutexLock(mutex);
try {
callback();
} finally {
ConcurrencyHelpers.mutexUnlock(mutex);
}
}
// Event api
public static native eventCreate(): Object;
public static native eventWait(event: Object): void;
public static native eventFire(event: Object): void;
// CondVar api
public static native condVarCreate(): Object;
public static native condVarWait(condVar: Object, mutex: Object): void;
public static native condVarNotifyOne(condVar: Object, mutex: Object): void;
public static native condVarNotifyAll(condVar: Object, mutex: Object): void;
}

View File

@ -15,8 +15,38 @@
package escompat;
// NOTE(ipetrov, #16281): Make taskpool.Priority when namespaces will be supported
export enum taskpoolPriority {
HIGH = 0,
MEDIUM = 1,
LOW = 2,
IDLE = 3
}
// NOTE(ipetrov, #16281): Make taskpool.State when namespaces will be supported
/**
* @enum defines the task state
* @see taskpool.TaskInfo
*/
export enum taskpoolState {
/// the task state is waiting
WAITING = 1,
/// the task state is running
RUNNING = 2,
/// the task state is canceled
CANCELED = 3
}
/**
* The type of callback to be registered
*/
export type taskpoolCallbackFunction = () => void;
/**
* The type of callback with error code to be registered
*/
export type taskpoolCallbackFunctionWithError = (e: Error) => void;
// NOTE(ipetrov, #16281): Make taskpool.Task when namespaces will be supported
@ -34,11 +64,12 @@ export class taskpoolTask {
constructor(name: string, func: Function0<NullishType>) {
this.name = name;
this.func = func;
this.args = [];
// NOTE(ipetrov, #15499): support information about durations
this.totalDuration = 0.0;
this.ioDuration = 0.0;
this.cpuDuration = 0.0;
this.totalDuration = 0;
this.ioDuration = 0;
this.cpuDuration = 0;
this.mutex = ConcurrencyHelpers.mutexCreate();
this.condVar = ConcurrencyHelpers.condVarCreate();
this.dependentTasks = new Set<taskpoolTask>();
// Each task has unique identifier
this.id = taskpoolTask.generateId();
}
@ -57,7 +88,9 @@ export class taskpoolTask {
*
* @returns true if current running task is canceled, false - otherwise
*/
static native isCancel(): boolean;
static isCanceled(): boolean {
return taskpool.isCancel();
}
/**
* Register a callback and call it when the task is enqueued
@ -103,14 +136,166 @@ export class taskpoolTask {
this.onSuccessCallback = callback;
}
/**
* Add dependencies on the task array for this task
* @param tasks An array of dependent tasks
* @throws Error if
* - no parameters
* - this task or argument tasks executed (as common or via SequenceRunner)
* - task or argument tasks were added to any group
* - argument tasks add a circular dependency
*/
addDependency(...tasks: taskpoolTask[]): void {
if (tasks.length == 0) {
throw new Error("addDependency has no params.");
}
if (this.isSubmitted || this.seqId != 0) {
throw new Error("taskpool:: seqRunnerTask or executedTask cannot addDependency");
}
if (this.groupId != 0) {
throw new Error("taskpool:: groupTask cannot addDependency");
}
for (const task: taskpoolTask of tasks) {
if (task.isSubmitted || task.seqId != 0) {
throw new Error("taskpool:: seqRunnerTask or executedTask cannot be relied on");
}
if (task.groupId != 0) {
throw new Error("taskpool:: groupTask cannot be relied on");
}
if (this.id == task.id || taskpoolTask.hasTaskDFS(this, task)) {
throw new Error("There is a circular dependency");
}
}
for (const task: taskpoolTask of tasks) {
if (task.dependentTasks.has(this)) {
continue;
}
task.dependentTasks.add(this);
this.taskDependenciesCount++;
}
this.isDependent = (this.taskDependenciesCount != 0);
}
/**
* Remove dependencies on the task array for this task
* @param tasks An array of dependent tasks
* @throws Error if
* - no parameters
* - this task or argument tasks executed
* - this task or argument tasks have not dependencies
* - this task does not depend on an argument task
*/
removeDependency(...tasks: taskpoolTask[]): void {
if (tasks.length == 0) {
throw new Error("removeDependency has no params.");
}
if (!this.isDependent) {
throw new Error("taskpool:: task has no dependency");
}
if (this.isSubmitted) {
throw new Error("taskpool:: executedTask cannot removeDependency");
}
for (const task: taskpoolTask of tasks) {
if (task.dependentTasks.size == 0 && !task.isDependent) {
throw new Error("taskpool:: task has no dependency");
}
if (task.isSubmitted) {
throw new Error("taskpool:: cannot removeDependency on a dependent and executed task");
}
if (!task.dependentTasks.has(this)) {
throw new Error("The dependency does not exist, ");
}
}
for (let task: taskpoolTask of tasks) {
task.dependentTasks.delete(this);
this.taskDependenciesCount--;
}
this.isDependent = (this.taskDependenciesCount != 0);
}
/**
* @returns true if the task has been completed, false - otherwise
*/
isDone(): boolean {
// NOTE(ipetrov, #15499): Need to support, potentially need to refactor native implementation
throw new Error("Does not support");
return this.isSubmitted && !taskpool.hasTask(this);
}
/**
* Set transfer list for this task
* @param transfer transfer list of this task
* @throws Error if clone list already set
*/
setTransferList(transfer?: ArrayBuffer[]): void {
if (this.cloneList != undefined) {
throw new Error("An ArrayBuffer cannot be set as both a transfer list and a clone list");
}
this.transferList = transfer;
}
/**
* Set clone list for this task
* @param cloneList clone list of this task
* @throws Error if transfer list already set
*/
setCloneList(cloneList: Object[] | ArrayBuffer[]): void {
if (this.transferList != undefined) {
throw new Error("An ArrayBuffer cannot be set as both a transfer list and a clone list");
}
this.cloneList = cloneList;
}
/**
* Send data to host side and trigger the registered callback
* @param args arguments for the registered callback
* @throws Error if the function is not called from a taskpool task
* @throws Error if the callback is not registered
*/
static sendData(...args: Object[]): void {
const item: taskpoolTask | undefined = taskpool.getCurrentTask();
if (item == undefined) {
throw new Error("The function is not called in the TaskPool thread");
}
const task: taskpoolTask = item;
if (task.onReceiveCallback == undefined) {
throw new Error("The callback is not registered on the host side");
}
ConcurrencyHelpers.mutexLock(task.dataMutex);
task.argsArray.push(args);
ConcurrencyHelpers.mutexUnlock(task.dataMutex);
}
/**
* Register a callback for this task to receive and handle data from the taskpool task
*/
onReceiveData(callback?: Function0<NullishType>): void {
this.onReceiveCallback = callback;
}
/// Concurrent function to execute in taskpool
func: Function0<NullishType>;
/// The concurrent function arguments
arguments?: NullishType[];
/// Task name
name: string;
/// Total duration of task execution
totalDuration: number;
/// IO duration of task execution
ioDuration: number;
/// CPU duration of task execution
cpuDuration: number;
/// ----- Internal implementation part -----
/**
* Check that the task can be executed as common task
* @throws Error if the task can not be executed as common task
* @see taskpool.execute
*/
internal checkExecution(): void {
if (this.groupId != 0) {
throw new Error("taskpool:: groupTask cannot execute outside");
@ -118,30 +303,75 @@ export class taskpoolTask {
if (this.seqId != 0) {
throw new Error("taskpool:: seqRunnerTask cannot execute outside");
}
if (this.isSubmitted && (this.isDependent || this.dependentTasks.size != 0)) {
throw new Error("taskpool:: executedTask with dependency cannot execute again");
}
}
private waitForDependencies(): void {
ConcurrencyHelpers.mutexLock(this.mutex);
while (this.taskDependenciesCount > 0) {
ConcurrencyHelpers.condVarWait(this.condVar, this.mutex);
}
// Only one instance of the task can be running
if (this.isRunning) {
ConcurrencyHelpers.condVarWait(this.condVar, this.mutex);
}
this.isRunning = true;
ConcurrencyHelpers.mutexUnlock(this.mutex);
}
private notifyDependencies(): void {
for (const dependentTask: taskpoolTask of this.dependentTasks) {
ConcurrencyHelpers.lockGuard(dependentTask.mutex, () => {
dependentTask.taskDependenciesCount--;
ConcurrencyHelpers.condVarNotifyOne(dependentTask.condVar, dependentTask.mutex);
});
}
ConcurrencyHelpers.mutexLock(this.mutex);
this.isRunning = false;
ConcurrencyHelpers.condVarNotifyOne(this.condVar, this.mutex);
ConcurrencyHelpers.mutexUnlock(this.mutex);
}
internal getDuration(): number {
return (Chrono.nanoNow() - this.startTime) / Chrono.NS_PER_MS;
}
internal execute(): NullishType {
if (!taskpool.taskStarted(this.id, this.groupId)) {
this.waitForDependencies();
if (!taskpool.taskStarted(this)) {
this.notifyDependencies();
this.throwCancelTaskError();
}
this.callUserCallback(this.onStartCallback);
let result: NullishType = null;
this.startTime = Chrono.nanoNow();
const startCpuTime: long = Chrono.getCpuTime();
try {
result = this.func();
this.callUserCallback(this.onSuccessCallback);
} catch (e: Error) {
try {
this.onFailCallback?.(e);
} catch (callbackError) {}
throw e;
} finally {
if (!taskpool.taskFinished(this.id, this.groupId)) {
// Save duration stats
this.cpuDuration = (Chrono.getCpuTime() - startCpuTime) / Chrono.NS_PER_MS;
this.totalDuration = this.getDuration();
this.ioDuration = this.totalDuration - this.cpuDuration;
// Process the task finishing
const notCanceled: boolean = taskpool.taskFinished(this);
this.notifyDependencies();
if (!notCanceled) {
this.throwCancelTaskError();
}
}
this.callUserCallback(this.onSuccessCallback);
return result;
}
/// @throws Error for the canceled task
private throwCancelTaskError(): void {
if (this.groupId != 0) {
throw new Error("taskpool:: taskGroup has been canceled");
@ -164,24 +394,92 @@ export class taskpoolTask {
} catch (e) {}
}
/// The task is added to taskpool for execution
internal enqueue(): void {
this.isSubmitted = true;
this.callUserCallback(this.onEnqueueCallback);
}
/**
* @see taskpool.execute
* @see taskpool.SequenceRunner.execute
*/
internal getOnReceivePromise(p: Promise<NullishType>): Promise<NullishType> {
if (this.onReceiveCallback == undefined) {
return p;
}
return p.then<NullishType>((value: NullishType): NullishType => {
const argsArray: Array<Object[]> = this.extractDataArgs();
argsArray.forEach((args: Object[]) => {
try {
this.onReceiveCallback!();
} catch(e) {}
});
return value;
});
}
/**
* Generate new unique identifier for a new task
* @see Task.constructor
*/
private static native generateId(): long;
totalDuration: number;
ioDuration: number;
cpuDuration: number;
func: Function0<NullishType>;
args: NullishType[];
name: string;
/**
* @see taskpool.Task.addDependency
*/
internal static hasTaskDFS(startTask: taskpoolTask, targetTask: taskpoolTask): boolean {
for (const dependentTask: taskpoolTask of startTask.dependentTasks) {
if (dependentTask.id == targetTask.id) {
return true;
}
if (taskpoolTask.hasTaskDFS(dependentTask, targetTask)) {
return true;
}
}
return false;
}
/// @see taskpool.Task.getOnReceivePromise
internal extractDataArgs(): Array<Object[]> {
ConcurrencyHelpers.mutexLock(this.dataMutex);
const argsArray: Array<Object[]> = this.argsArray;
this.argsArray = new Array<Object[]>();
ConcurrencyHelpers.mutexUnlock(this.dataMutex);
return argsArray;
}
/// Unique identifier of the task
internal readonly id: long;
/// id of group which contains the task, 0 means the task is not part of any group
internal groupId: long = 0;
/// id of sequence runner which ran the task, 0 means task was not ran via any sequence runner
internal seqId: long = 0;
/// the task has been submitted into taskpool
internal isSubmitted: boolean = false;
/// the task depends on another task
internal isDependent: boolean = false;
/// Count of tasks which should be executed before this task
private taskDependenciesCount: int = 0;
/// Set of tasks which can be executed after this task
internal dependentTasks: Set<taskpoolTask>;
/// identifier of worker which starts to execute the task
internal workerId: int = -1;
/// an instance of the task is running
private isRunning: boolean = false;
/// start time of the task
private startTime: long = 0;
// NOTE(ipetrov, #19949): Change to private when bug in frontend will be fixed
internal mutex: Object;
internal condVar: Object;
private transferList?: ArrayBuffer[];
private cloneList?: Object[] | ArrayBuffer[];
private onReceiveCallback?: Function0<NullishType>;
private dataMutex: Object = ConcurrencyHelpers.mutexCreate();
private argsArray: Array<Object[]> = new Array<Object[]>();
private onEnqueueCallback?: taskpoolCallbackFunction;
private onStartCallback?: taskpoolCallbackFunction;
@ -193,12 +491,22 @@ export class taskpoolTask {
/**
* @class LongTask provides an interface to create a long executing task.
* The such task can be executed only once
* @extends Task
* @extends Task
*/
export class taskpoolLongTask extends taskpoolTask {
/**
* Create a LongTask instance
* @param name The name of long task
* @param func Concurrent function to execute in the taskpool
*/
constructor(name: string, func: Function0<NullishType>) {
super(name, func);
}
/**
* Create a LongTask instance
* @param func Concurrent function to execute in the taskpool
*/
constructor(func: Function0<NullishType>) {
super("", func);
}
@ -259,6 +567,8 @@ export class taskpoolTaskGroup {
throw new Error("taskpool:: The interface does not support the long task");
} else if (task.isSubmitted || task.seqId != 0) {
throw new Error("taskpool:: taskGroup cannot add seqRunnerTask or executedTask");
} else if (task.isDependent || task.dependentTasks.size != 0) {
throw new Error("taskpool:: dependent task not allowed.");
}
task.groupId = this.id;
this.tasks.push(task);
@ -278,7 +588,15 @@ export class taskpoolTaskGroup {
internal readonly id: long;
internal tasks: Array<taskpoolTask>;
internal isSubmitted: boolean = false;
}
class SeqRunnerImpl {
constructor(id: long) {
this.id = id;
this.seqPromise = Promise.resolve<NullishType>(new Object());
}
readonly id: long;
seqPromise: Promise<NullishType>;
}
// NOTE(ipetrov, #16281): Temporary solution, make as namespace when namespaces will be supported
@ -289,25 +607,44 @@ export class taskpoolSequenceRunner {
/**
* Create a SequenceRunner instance
*/
constructor(/* priority?: Priority */) {
this.id = taskpoolSequenceRunner.generateSeqRunnerId();
this.seqPromise = Promise.resolve<NullishType>(new Object());
constructor(priority?: taskpoolPriority) {
this("", priority);
}
constructor(name: string, priority?: taskpoolPriority) {
this.impl = taskpoolSequenceRunner.getOrCreateSeqRunnerImpl(name);
}
execute(task: taskpoolTask): Promise<NullishType> {
this.checkExecution(task);
task.seqId = this.id;
taskpool.taskSubmitted(task.id);
task.seqId = this.impl.id;
taskpool.taskSubmitted(task);
task.enqueue();
let taskRunner = (value: NullishType): NullishType => {
const taskRunner = (value: NullishType): NullishType => {
return task.execute();
};
this.seqPromise = this.seqPromise.then<NullishType, NullishType>(taskRunner, taskRunner);
return this.seqPromise;
this.impl.seqPromise = task.getOnReceivePromise(this.impl.seqPromise.then<NullishType, NullishType>(taskRunner, taskRunner));
return this.impl.seqPromise;
}
private static native generateSeqRunnerId(): long;
private static getOrCreateSeqRunnerImpl(name: string): SeqRunnerImpl {
if (name == "") {
return new SeqRunnerImpl(taskpoolSequenceRunner.generateSeqRunnerId());
}
ConcurrencyHelpers.mutexLock(taskpoolSequenceRunner.mutex);
let impl: SeqRunnerImpl | undefined = taskpoolSequenceRunner.namedRunners.get(name);
// NOTE(ipetrov, #18518): Change to === when scrict equals will be supported into frontend
if (impl == undefined) {
// No sequence runner with passed name, so create instance and set to named runners set
impl = new SeqRunnerImpl(taskpoolSequenceRunner.generateSeqRunnerId());
taskpoolSequenceRunner.namedRunners.set(name, impl);
}
ConcurrencyHelpers.mutexUnlock(taskpoolSequenceRunner.mutex);
return impl!;
}
private checkExecution(task: taskpoolTask) {
if (task.groupId != 0) {
throw new Error("taskpool:: SequenceRunner cannot execute groupTask");
@ -315,14 +652,65 @@ export class taskpoolSequenceRunner {
if (task.isSubmitted || task.seqId != 0) {
throw new Error("taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask");
}
if (task.isDependent || task.dependentTasks.size != 0) {
throw new Error("seqRunner:: dependent task not allowed.");
}
}
internal readonly id: long;
private seqPromise: Promise<NullishType>;
private impl: SeqRunnerImpl;
static namedRunners: Map<string, SeqRunnerImpl> = new Map<string, SeqRunnerImpl>();
static mutex: Object = ConcurrencyHelpers.mutexCreate();
}
/**
* @class represents internal information about task in taskpool
*/
export class taskpoolTaskInfo {
/// Unique identifier of task
taskId: number = 0;
/// Task state in taskpool
state: taskpoolState = taskpoolState.WAITING;
/// Duration of task exeuction
duration?: number;
/// Task name
name: string = "";
}
/**
* @class represents internal information about worker thread with taskpool tasks
*/
export class taskpoolThreadInfo {
/// Worker thread id
tid: number = -1.0;
/// Running task identifiers list on current worker thread
taskIds?: number[];
/// Thread priority
priority?: taskpoolPriority;
}
/**
* @class represents internal information about taskpool
*/
export class taskpoolTaskPoolInfo {
/// Array of threads information with taskpool tasks
threadInfos: taskpoolThreadInfo[] = new taskpoolThreadInfo[0];
/// Array of taskpool tasks information
taskInfos: taskpoolTaskInfo[] = new taskpoolTaskInfo[0];
}
type CoroutineId = int;
type WorkerId = int;
// NOTE(ipetrov, #16281): Temporary solution, make as namespace when namespaces will be supported
export class taskpool {
export final class taskpool {
static execute(func: Function0<NullishType>): Promise<NullishType> {
return launch func();
}
@ -381,11 +769,11 @@ export class taskpool {
* @param task The task for executing
* @returns Promise for result of executed task
*/
static execute(task: taskpoolTask /*, priority?: Priority */): Promise<NullishType> {
static execute(task: taskpoolTask, priority?: taskpoolPriority): Promise<NullishType> {
task.checkExecution();
taskpool.taskSubmitted(task.id);
taskpool.taskSubmitted(task);
task.enqueue();
return launch task.execute();
return task.getOnReceivePromise(launch task.execute());
}
/**
@ -394,22 +782,31 @@ export class taskpool {
* @param group The task group for execution
* @returns Promise for array of results of executed tasks from the group
*/
static execute(group: taskpoolTaskGroup /*, priority?: Priority */): Promise<Array<NullishType>> {
let tasksCount: long = group.tasks.length as long;
static execute(group: taskpoolTaskGroup, priority?: taskpoolPriority): Promise<Array<NullishType>> {
const tasksCount: long = group.tasks.length as int;
if (tasksCount == 0) {
group.isSubmitted = true;
return Promise.resolve(new Array<NullishType>());
return Promise.resolve<Array<NullishType>>(new Array<NullishType>());
}
taskpool.taskGroupSubmitted(group.id, tasksCount);
let promises = new Array<Promise<NullishType>>();
group.tasks.forEach((task: taskpoolTask, i: number) => {
const promises: Array<Promise<NullishType>> = new Array<Promise<NullishType>>();
group.tasks.forEach((task: taskpoolTask) => {
taskpool.taskSubmitted(task);
task.enqueue();
promises.push(launch task.execute());
promises.push(task.getOnReceivePromise(launch task.execute()));
});
group.isSubmitted = true;
return Promise.all<NullishType>(promises);
}
static executeDelayed(delayTime: number, task: taskpoolTask, priority?: taskpoolPriority): Promise<NullishType> {
// NOTE(ipetrov, #20012): implement when setTimeout will be supported for all coroutine
throw new Error("Not implemented");
}
static executePeriodically(period: number, task: taskpoolTask, priority?: taskpoolPriority): void {
// NOTE(ipetrov, #20012): implement when setInterval will be supported for all coroutine
throw new Error("Not implemented");
}
/**
* Cancel a concurrent task
*
@ -418,7 +815,17 @@ export class taskpool {
* @see Task.isCancel
*/
static cancel(task: taskpoolTask): void {
taskpool.cancelImpl(task.id, task.seqId);
ConcurrencyHelpers.lockGuard(taskpool.mutex, () => {
if (taskpool.waitingTasks.has(task) || taskpool.runningTasks.has(task)) {
taskpool.tasksToBeCanceled.add(task);
return;
}
if (task.seqId != 0) {
throw new Error("taskpool:: sequenceRunner task has been executed");
} else {
throw new Error("taskpool:: task is not executed or has been executed");
}
});
}
/**
@ -428,7 +835,13 @@ export class taskpool {
* @throws Error if the task group does not exist when it is canceled
*/
static cancel(group: taskpoolTaskGroup): void {
taskpool.cancelGroupImpl(group.id);
ConcurrencyHelpers.lockGuard(taskpool.mutex, () => {
if (taskpool.waitingGroupTasks.has(group.id) || taskpool.runningGroupTasks.has(group.id)) {
taskpool.groupsToBeCanceled.add(group.id);
return;
}
throw new Error("taskpool:: taskGroup is not executed or has been executed");
});
}
/**
@ -439,17 +852,92 @@ export class taskpool {
*/
static terminateTask(longTask: taskpoolLongTask): void {}
private static native cancelImpl(taskId: long, seqId: long): void;
/**
* @returns taskpool internal information about tasks and threads
*/
static getTaskPoolInfo(): taskpoolTaskPoolInfo {
const taskInfos: Array<taskpoolTaskInfo> = new Array<taskpoolTaskInfo>();
const threadInfos: Array<taskpoolThreadInfo> = new Array<taskpoolThreadInfo>();
ConcurrencyHelpers.mutexLock(taskpool.mutex);
// Collect running and canceled tasks info
taskpool.runningTasks.forEach((count: int, task: taskpoolTask) => {
const taskInfo: taskpoolTaskInfo = new taskpoolTaskInfo();
taskInfo.taskId = task.id;
taskInfo.name = task.name;
taskInfo.state = (taskpool.isCanceled(task)) ? taskpoolState.CANCELED : taskpoolState.RUNNING;
taskInfo.duration = task.getDuration();
taskInfos.push(taskInfo);
});
// Collect waiting and canceled tasks info
taskpool.waitingTasks.forEach((count: int, task: taskpoolTask) => {
// One task can be submitted to taskpool several time, but only one instance can be running,
// so if task is running, it was added with running status
if (taskpool.runningTasks.has(task)) {
return;
}
const taskInfo: taskpoolTaskInfo = new taskpoolTaskInfo();
taskInfo.taskId = task.id;
taskInfo.name = task.name;
taskInfo.state = (taskpool.isCanceled(task)) ? taskpoolState.CANCELED : taskpoolState.WAITING;
taskInfos.push(taskInfo);
});
// Collect worker threads info
taskpool.workerToTasks.forEach((tasks: Set<taskpoolTask>, workerId: int) => {
const threadInfo: taskpoolThreadInfo = new taskpoolThreadInfo();
threadInfo.tid = workerId;
const tasksCount: int = tasks.size as int;
if (tasksCount == 0) {
threadInfos.push(threadInfo);
return;
}
const taskIds: number[] = new number[tasksCount];
let currentIndex: int = 0;
tasks.forEach((task: taskpoolTask) => {
taskIds[currentIndex] = task.id;
currentIndex++;
});
threadInfo.taskIds = taskIds;
threadInfo.priority = taskpoolPriority.MEDIUM;
threadInfos.push(threadInfo);
});
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
const taskpoolInfo: taskpoolTaskPoolInfo = new taskpoolTaskPoolInfo();
const taskInfosRes: taskpoolTaskInfo[] = new taskpoolTaskInfo[taskInfos.length as int];
const threadInfosRes: taskpoolThreadInfo[] = new taskpoolThreadInfo[threadInfos.length as int];
taskInfos.forEach((taskInfo: taskpoolTaskInfo, index: number) => {
taskInfosRes[index as int] = taskInfo;
});
threadInfos.forEach((threadInfo: taskpoolThreadInfo, index: number) => {
threadInfosRes[index as int] = threadInfo;
});
taskpoolInfo.taskInfos = taskInfosRes;
taskpoolInfo.threadInfos = threadInfosRes;
return taskpoolInfo;
}
private static native cancelGroupImpl(groupId: long): void;
// NOTE(ipetrov, #17953): Change signature when lambdaN will be supported
/**
* @returns true if the function is a concurrent function, false - otherwise
*/
static isConcurrent(func: Object): boolean {
// Now any function in static ArkTS can be executed concurrently
return true;
}
/// ----- Internal implementation part -----
/**
* @brief Submit task to the taskpool on the execute method
* @see taskpool.execute
* @see taskpool.SequenceRunner.execute
* @param taskId identifier of the submitted task
* @param task submitting task
*/
internal static native taskSubmitted(taskId: long): void;
internal static taskSubmitted(task: taskpoolTask): void {
ConcurrencyHelpers.mutexLock(taskpool.mutex);
const count: int = taskpool.waitingTasks.get(task, /* default */ 0);
taskpool.waitingTasks.set(task, count + 1);
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
}
/**
* @brief Submit group of tasks to the taskpool on the execute method
@ -457,18 +945,75 @@ export class taskpool {
* @param groupId identifier of the submitted group
* @param tasksCount count of tasks in the passed group
*/
private static native taskGroupSubmitted(groupId: long, tasksCount: long): void;
private static taskGroupSubmitted(groupId: long, tasksCount: int): void {
ConcurrencyHelpers.mutexLock(taskpool.mutex);
const count: int = taskpool.waitingGroupTasks.get(groupId, /* default */ 0);
taskpool.waitingGroupTasks.set(groupId, count + tasksCount);
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
}
/**
* @brief Notify the taskpool that the task is started on a coroutine
* @param taskId identifier of the started task
* @param groupId group identifier of the passed task (0 means task is not group task)
* @param task starting task
* @returns true if task is not cancled by cancel method, false - otherwise
*
* @see taskpool.Task.execute
* @see taskpool.cancel
*/
internal static native taskStarted(taskId: long, groupId: long): boolean;
internal static taskStarted(task: taskpoolTask): boolean {
let isTaskCancel: boolean = false;
ConcurrencyHelpers.lockGuard(taskpool.mutex, () => {
const cancelByTask: boolean = taskpool.popFromMap(/* extracting task from waiting map */ task,
/* pop from */ taskpool.waitingTasks,
/* the map can contain the instance of the task */ taskpool.runningTasks,
/* set of tasks marked as canceled */ taskpool.tasksToBeCanceled);
let cancelByGroup: boolean = false;
if (task.groupId != 0) {
// If the task is group task then decrement count of waiting task for the group
cancelByGroup = taskpool.popFromMap(task.groupId, taskpool.waitingGroupTasks, taskpool.runningGroupTasks, taskpool.groupsToBeCanceled);
}
if (cancelByGroup || cancelByTask) {
isTaskCancel = true;
return;
}
taskpool.pushToRunning(task);
});
return !isTaskCancel;
}
private static popFromMap<T>(item: T, currentTasksMap: Map<T, int>, possibleTasksMap: Map<T, int>, cancelSet: Set<T>): boolean {
const instancesOfTheTask: int = currentTasksMap.get(item)! - 1;
if (instancesOfTheTask == 0) {
// This is last instance in the map
currentTasksMap.delete(item);
} else {
currentTasksMap.set(item, instancesOfTheTask);
}
if (cancelSet.has(item)) {
if (instancesOfTheTask == 0 && !possibleTasksMap.has(item)) {
// No more instances in the taskpool, so delete from cancelSet
cancelSet.delete(item);
}
return true; // was marked as canceled
}
return false;
}
private static pushToRunning(task: taskpoolTask): void {
const runningInstancesOfTheTask: int = taskpool.runningTasks.get(task, /* default */ 0);
taskpool.runningTasks.set(task, runningInstancesOfTheTask + 1);
if (task.groupId != 0) {
const runningTasksOfTheGroup: int = taskpool.runningGroupTasks.get(task.groupId, /* default */ 0);
taskpool.runningGroupTasks.set(task.groupId, runningTasksOfTheGroup + 1);
}
// NOTE(ipetrov, #20208): CoroutineExtras is debug functionality, maybe need to create a separate intrinsic
taskpool.executingTasks.set(CoroutineExtras.getCoroutineId(), task);
const currentWorkerId: int = CoroutineExtras.getWorkerId();
task.workerId = currentWorkerId;
const workerTasks: Set<taskpoolTask> = taskpool.workerToTasks.get(currentWorkerId, /* default */ new Set<taskpoolTask>());
workerTasks.add(task);
taskpool.workerToTasks.set(currentWorkerId, workerTasks);
}
/**
* @brief Notify the taskpool that the task is finished on a coroutine
@ -479,5 +1024,72 @@ export class taskpool {
* @see taskpool.Task.execute
* @see taskpool.cancel
*/
internal static native taskFinished(taskId: long, groupId: long): boolean;
internal static taskFinished(task: taskpoolTask): boolean {
let isTaskCanceled: boolean = false;
ConcurrencyHelpers.lockGuard(taskpool.mutex, () => {
// NOTE(ipetrov, #20208): CoroutineExtras is debug functionality, maybe need to create a separate intrinsic
taskpool.executingTasks.delete(CoroutineExtras.getCoroutineId());
taskpool.workerToTasks.get(task.workerId)!.delete(task);
const cancelByTask: boolean = taskpool.popFromMap(task, taskpool.runningTasks, taskpool.waitingTasks, taskpool.tasksToBeCanceled);
let cancelByGroup: boolean = false;
if (task.groupId != 0) {
cancelByGroup = taskpool.popFromMap(task.groupId, taskpool.runningGroupTasks, taskpool.waitingGroupTasks, taskpool.groupsToBeCanceled);
}
if (cancelByGroup || cancelByTask) {
isTaskCanceled = true;
}
});
return !isTaskCanceled;
}
private static getCurrentTaskUnsafe(): taskpoolTask | undefined {
// NOTE(ipetrov, #20208): CoroutineExtras is debug functionality, maybe need to create a separate intrinsic
return taskpool.executingTasks.get(CoroutineExtras.getCoroutineId());
}
internal static getCurrentTask(): taskpoolTask | undefined {
ConcurrencyHelpers.mutexLock(taskpool.mutex);
const task: taskpoolTask | undefined = taskpool.getCurrentTaskUnsafe();
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
return task;
}
internal static isCancel(): boolean {
ConcurrencyHelpers.mutexLock(taskpool.mutex);
const item: taskpoolTask | undefined = taskpool.getCurrentTaskUnsafe();
if (item == undefined) {
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
// No task on current coroutine
return false;
}
const isCanceled: boolean = taskpool.isCanceled(item!);
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
return isCanceled;
}
internal static isCanceled(task: taskpoolTask): boolean {
return taskpool.tasksToBeCanceled.has(task) || taskpool.groupsToBeCanceled.has(task.groupId);
}
internal static hasTask(task: taskpoolTask): boolean {
ConcurrencyHelpers.mutexLock(taskpool.mutex);
const result: boolean = taskpool.waitingTasks.has(task) || taskpool.runningTasks.has(task);
ConcurrencyHelpers.mutexUnlock(taskpool.mutex);
return result;
}
// managed mutex for taskpool containers below
private static mutex = ConcurrencyHelpers.mutexCreate();
// value is count of tasks
private static waitingTasks: Map<taskpoolTask, int> = new Map<taskpoolTask, int>();
private static runningTasks: Map<taskpoolTask, int> = new Map<taskpoolTask, int>();
private static tasksToBeCanceled: Set<taskpoolTask> = new Set<taskpoolTask>();
// key is group id, value is count of associated tasks with this group
private static waitingGroupTasks: Map<long, int> = new Map<long, int>();
private static runningGroupTasks: Map<long, int> = new Map<long, int>();
private static groupsToBeCanceled: Set<long> = new Set<long>();
// value is executing task on the associated coroutine
private static executingTasks: Map<CoroutineId, taskpoolTask> = new Map<CoroutineId, taskpoolTask>();
// value is set of executing tasks on this worker
private static workerToTasks: Map<WorkerId, Set<taskpoolTask>> = new Map<WorkerId, Set<taskpoolTask>>();
}

View File

@ -22,6 +22,15 @@ final class ConcurrencyHelpers {
public static native mutexLock(mutex: Object): void;
public static native mutexUnlock(mutex: Object): void;
public static lockGuard(mutex: Object, callback: () => void): void {
ConcurrencyHelpers.mutexLock(mutex);
try {
callback();
} finally {
ConcurrencyHelpers.mutexUnlock(mutex);
}
}
// Event api
public static native eventCreate(): Object;
public static native eventWait(event: Object): void;

View File

@ -18,10 +18,7 @@ package std.core;
/**
* @class Represents an error that occurs when assertion fails.
*/
export class AssertionError extends Error {
readonly tmp_console = new Console();
//NOTE(kirill-mitkin): Used by frontend
constructor(message: String) {
this(message, undefined)
@ -29,7 +26,7 @@ export class AssertionError extends Error {
constructor(message?: String, options?: ErrorOptions) {
super("AssertionError", message, options)
this.tmp_console.log(message!);
console.log(message!);
}
}

View File

@ -15,8 +15,15 @@
package std.time;
export final class Chrono {
/// Count of nanoseconds in 1 millisecond
public static readonly NS_PER_MS: long = 1000000;
// now() in nanoseconds
public static native nanoNow(): long;
/**
* @returns cpu time in nanoseconds
*/
public static native getCpuTime(): long;
}

View File

@ -99,7 +99,6 @@ srcs_runtime = [
"runtime/ets_native_library_provider.cpp",
"runtime/ets_runtime_interface.cpp",
"runtime/ets_stubs.cpp",
"runtime/ets_taskpool.cpp",
"runtime/ets_vm.cpp",
"runtime/ets_vm_api.cpp", # TODO(nsizov): Take into account PR640, if needed
"runtime/ets_vtable_builder.cpp",

View File

@ -13,6 +13,8 @@
* limitations under the License.
*/
import { CoroutineExtras, AtomicFlag } from "std/debug/concurrency";
function returnAbc(): string {
return (() => "a")() + (() => "b")() + (() => "c")();
}
@ -21,144 +23,287 @@ function returnFailAbc(): string {
throw new Error("Concurrent function failed");
}
function TaskpoolCommonTaskTestLambda() {
let Sum = (): int => {
return 10 + 20;
}
let task = new taskpoolTask(Sum);
let result = await taskpool.execute(task);
assert(result == 30);
}
function TaskpoolCommonTaskTestFunc() {
let task = new taskpoolTask(returnAbc);
let result = await taskpool.execute(task);
assert(result == "abc");
}
function TaskpoolCommonTaskTestRunCallbacksOnSucceededTask() {
let onEnqueuedStr = "";
let onStartExecutionStr = "";
let onExecutionSucceededStr = "";
let onExecutionFailedStr = "";
let task = new taskpoolTask(returnAbc);
task.onEnqueued(() => { onEnqueuedStr = "enqueue callback is done"; });
task.onStartExecution(() => { onStartExecutionStr = "start callback is done"; });
task.onExecutionSucceeded(() => { onExecutionSucceededStr = "success callback is done"; });
task.onExecutionFailed((e: Error) => { onExecutionFailedStr = e.message + ", fail callback is done"; });
let res = await taskpool.execute(task);
assert(res == "abc");
assert(onEnqueuedStr == "enqueue callback is done");
assert(onStartExecutionStr == "start callback is done");
assert(onExecutionSucceededStr == "success callback is done");
assert(onExecutionFailedStr == "");
}
function TaskpoolCommonTaskTestRunCallbacksOnFailedTask() {
let onEnqueuedStr = "";
let onStartExecutionStr = "";
let onExecutionSucceededStr = "";
let onExecutionFailedStr = "";
let task = new taskpoolTask(returnFailAbc);
task.onEnqueued(() => { onEnqueuedStr = "enqueue callback is done"; });
task.onStartExecution(() => { onStartExecutionStr = "start callback is done"; });
task.onExecutionSucceeded(() => { onExecutionSucceededStr = "success callback is done"; });
task.onExecutionFailed((e: Error) => { onExecutionFailedStr = e.message + ", fail callback is done"; });
let isErrorOccurred = false;
try {
await taskpool.execute(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "Concurrent function failed");
}
assert(isErrorOccurred);
assert(onEnqueuedStr == "enqueue callback is done");
assert(onStartExecutionStr == "start callback is done");
assert(onExecutionSucceededStr == "");
assert(onExecutionFailedStr == "Concurrent function failed, fail callback is done");
}
function expectErrorWithMessage(fn: () => void, expectedErrorMessage: string): void {
try {
fn();
} catch (e: Error) {
assert(e.message == expectedErrorMessage);
return;
}
assert(!"Callback passed, but Error is expected");
}
function TaskpoolCommonTaskTestAddCallbacksAfterExecution() {
let task = new taskpoolTask(returnAbc);
taskpool.execute(task);
let expectedErrorMessage = "taskpool:: The executed task does not support the registration of listeners.";
expectErrorWithMessage(() => { task.onEnqueued(() => {}); }, expectedErrorMessage);
expectErrorWithMessage(() => { task.onStartExecution(() => {}); }, expectedErrorMessage);
expectErrorWithMessage(() => { task.onExecutionSucceeded(() => {}); }, expectedErrorMessage);
expectErrorWithMessage(() => { task.onExecutionFailed((e: Error) => {}); }, expectedErrorMessage);
}
function TaskpoolCommonTaskTestIsCancel() {
assert(taskpoolTask.isCancel() == false);
let t = new taskpoolTask(returnAbc);
taskpool.execute(t);
assert(taskpoolTask.isCancel() == false);
}
function TaskpoolCommonTaskTestCancelNonStartedTask() {
let t = new taskpoolTask(returnAbc);
let isErrorOccurred = false;
try {
taskpool.cancel(t);
taskpool.execute(t);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: task is not executed or has been executed");
}
assert(isErrorOccurred);
}
function TaskpoolCommonTaskTestCancelExecutingTask() {
let t = new taskpoolTask(returnAbc);
let p = taskpool.execute(t);
try {
taskpool.cancel(t);
} catch(e: Error) {
assert(e.message == "taskpool:: task is not executed or has been executed");
return;
}
let isErrorOccurred = false;
try {
let res = await p;
} catch(e: Error) {
assert(e.message == "taskpool:: task has been canceled");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolCommonTaskTestCancelExecutedTask() {
let t = new taskpoolTask(returnAbc);
let isErrorOccurred = false;
try {
await taskpool.execute(t);
taskpool.cancel(t);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: task is not executed or has been executed");
}
assert(isErrorOccurred);
}
function main(): int {
TaskpoolCommonTaskTestLambda();
TaskpoolCommonTaskTestFunc();
TaskpoolCommonTaskTestRunCallbacksOnSucceededTask();
TaskpoolCommonTaskTestRunCallbacksOnFailedTask();
TaskpoolCommonTaskTestAddCallbacksAfterExecution();
TaskpoolCommonTaskTestIsCancel();
TaskpoolCommonTaskTestCancelNonStartedTask();
TaskpoolCommonTaskTestCancelExecutingTask();
TaskpoolCommonTaskTestCancelExecutedTask();
return 0;
let commonTaskSuite = new ArkTestsuite("taskpool.CommonTask");
commonTaskSuite.addTest("GetTaskpoolInfoTest", () =>
{
CoroutineExtras.setSchedulingPolicy(CoroutineExtras.POLICY_NON_MAIN);
let flagForTask: AtomicFlag = new AtomicFlag(false);
let isRunTask: AtomicFlag = new AtomicFlag(false);
let funcForTask = (): string => {
isRunTask.set(true);
while (flagForTask.get() != true) {}
return "result"
};
let task = new taskpoolTask("task_name", funcForTask);
let p = taskpool.execute(task);
// wait for task started
while (isRunTask.get() != true) {}
let info = taskpool.getTaskPoolInfo();
// Notify to continue task
flagForTask.set(true);
assertEQ(info.taskInfos.length, 1);
assertNE(info.threadInfos.length, 0);
let isNonEmptyWorker: boolean = false;
for (let threadInfo of info.threadInfos) {
assertLT(0, threadInfo.tid);
isNonEmptyWorker = isNonEmptyWorker || (threadInfo.taskIds != undefined);
}
assertEQ(isNonEmptyWorker, true);
// NOTE(ipetrov, #20349): uncomment when frontend fix enum comparison
assertEQ(info.taskInfos[0].state as int, taskpoolState.RUNNING as int);
assertEQ(info.taskInfos[0].name, "task_name");
assertEQ(await p, "result");
CoroutineExtras.setSchedulingPolicy(CoroutineExtras.POLICY_DEFAULT);
});
commonTaskSuite.addTest("OneTaskExecutionTest", () =>
{
assertEQ(taskpool.isConcurrent(returnAbc), true);
let Sum = (): int => {
return 10 + 20;
}
// Task with lambda
let lambdaTask = new taskpoolTask(Sum);
let result = await taskpool.execute(lambdaTask);
assertEQ(result, 30);
assertEQ(lambdaTask.isDone(), true);
// Task with function
let funcTask = new taskpoolTask(returnAbc);
let res = await taskpool.execute(funcTask);
assertEQ(res, "abc");
assertEQ(funcTask.isDone(), true);
// Execute one task several times
let p1 = taskpool.execute(funcTask);
let p2 = taskpool.execute(funcTask);
let p3 = taskpool.execute(funcTask);
let p4 = taskpool.execute(funcTask);
assertEQ(await p1, "abc");
assertEQ(await p2, "abc");
assertEQ(await p3, "abc");
assertEQ(await p4, "abc");
assertEQ(funcTask.isDone(), true);
});
commonTaskSuite.addTest("CallbackTest", (): void throws =>
{
let onEnqueuedStr = "";
let onEnqueuedExpectedStr = "enqueue callback is done";
let onEnqueuedCallback = () => { onEnqueuedStr = onEnqueuedExpectedStr; };
let onStartExecutionStr = "";
let onStartExecutionExpepctedStr = "start callback is done";
let onStartExecutionCallback = () => { onStartExecutionStr = onStartExecutionExpepctedStr; };
let onExecutionSucceededStr = "";
let onExecutionSucceededExpectedStr = "success callback is done";
let onExecutionSucceededCallback = () => { onExecutionSucceededStr = onExecutionSucceededExpectedStr; };
let onExecutionFailedStr = "";
let onExecutionFailedExpectedStr = "Concurrent function failed, fail callback is done";
let onExecutionFailedCallback = (e: Error) => { onExecutionFailedStr = e.message + ", fail callback is done"; };
// Successed task
let task = new taskpoolTask(returnAbc);
task.onEnqueued(onEnqueuedCallback);
task.onStartExecution(onStartExecutionCallback);
task.onExecutionSucceeded(onExecutionSucceededCallback);
task.onExecutionFailed(onExecutionFailedCallback);
let res = await taskpool.execute(task);
assertEQ(res, "abc");
assertEQ(onEnqueuedStr, onEnqueuedExpectedStr);
assertEQ(onStartExecutionStr, onStartExecutionExpepctedStr);
assertEQ(onExecutionSucceededStr, onExecutionSucceededExpectedStr);
assertEQ(onExecutionFailedStr, "");
// Failed task
onEnqueuedStr = "";
onStartExecutionStr = "";
onExecutionSucceededStr = "";
let failedTask = new taskpoolTask(returnFailAbc);
failedTask.onEnqueued(onEnqueuedCallback);
failedTask.onStartExecution(onStartExecutionCallback);
failedTask.onExecutionSucceeded(onExecutionSucceededCallback);
failedTask.onExecutionFailed(onExecutionFailedCallback);
expectError(() => { await taskpool.execute(failedTask) },
new Error("Concurrent function failed"));
assertEQ(onEnqueuedStr, onEnqueuedExpectedStr);
assertEQ(onStartExecutionStr, onStartExecutionExpepctedStr);
assertEQ(onExecutionSucceededStr, "");
assertEQ(onExecutionFailedStr, onExecutionFailedExpectedStr);
// Add callbacks for executed task
let executedTask = new taskpoolTask(returnAbc);
let expectedCallbackError = new Error("taskpool:: The executed task does not support the registration of listeners.");
taskpool.execute(executedTask);
expectError(() => { executedTask.onEnqueued(() => {}); }, expectedCallbackError);
expectError(() => { executedTask.onStartExecution(() => {}); }, expectedCallbackError);
expectError(() => { executedTask.onExecutionSucceeded(() => {}); }, expectedCallbackError);
expectError(() => { executedTask.onExecutionFailed((e: Error) => {}); }, expectedCallbackError);
});
commonTaskSuite.addTest("IsCancelTest", (): void throws =>
{
assertEQ(taskpoolTask.isCanceled(), false);
let t = new taskpoolTask(returnAbc);
taskpool.execute(t);
assertEQ(taskpoolTask.isCanceled(), false);
// Check canceling of exeuting task
CoroutineExtras.setSchedulingPolicy(CoroutineExtras.POLICY_NON_MAIN);
let isCanceled: AtomicFlag = new AtomicFlag(false);
let isRunning: AtomicFlag = new AtomicFlag(false);
let waitForCancel: AtomicFlag = new AtomicFlag(true);
let funcForTask = (): boolean => {
isRunning.set(true);
while(waitForCancel.get()) {}
// Set isCanceled value from taskpool
isCanceled.set(taskpoolTask.isCanceled());
isRunning.set(false);
return true;
};
let task = new taskpoolTask(funcForTask);
let p = taskpool.execute(task);
while (!isRunning.get()) {}
assertEQ(task.isDone(), false);
taskpool.cancel(task);
waitForCancel.set(false);
while (isRunning.get()) {}
// Check isCanceled from taskpool
assertEQ(isCanceled.get(), true);
expectError(() => { await p }, new Error("taskpool:: task has been canceled"));
CoroutineExtras.setSchedulingPolicy(CoroutineExtras.POLICY_DEFAULT);
});
commonTaskSuite.addTest("CancelNonExecutingTaskTest", (): void throws =>
{
let expectedError = new Error("taskpool:: task is not executed or has been executed");
expectError(() => {
let t = new taskpoolTask(returnAbc);
taskpool.cancel(t);
taskpool.execute(t);
}, expectedError);
expectError(() => {
let t = new taskpoolTask(returnAbc);
await taskpool.execute(t);
taskpool.cancel(t);
}, expectedError);
});
commonTaskSuite.addTest("CancelExecutingTaskTest", (): void throws =>
{
let t = new taskpoolTask(returnAbc);
let p = taskpool.execute(t);
try {
taskpool.cancel(t);
} catch(e: Error) {
assertEQ(e.message, "taskpool:: task is not executed or has been executed");
return;
}
expectError(() => { await p }, new Error("taskpool:: task has been canceled"));
// task finished, new execution should not produce an Error
expectNoThrow(() => { await taskpool.execute(t) });
});
commonTaskSuite.addTest("DependentTasksTest", (): void throws =>
{
let resultStr: string = "";
let func1 = (): string => {
resultStr += "1";
return resultStr;
};
let func2 = (): string => {
resultStr += "2";
return resultStr;
}
let func3 = (): string => {
resultStr += "3";
return resultStr;
}
let task1 = new taskpoolTask(func1);
let task2 = new taskpoolTask(func2);
let task3 = new taskpoolTask(func3);
task1.addDependency(task2);
task2.addDependency(task3);
let p1 = taskpool.execute(task1);
let p2 = taskpool.execute(task2);
let p3 = taskpool.execute(task3);
let res2 = await p2;
assertEQ(res2, "32");
let res3 = await p3;
assertEQ(res3, "3");
let res1 = await p1;
assertEQ(res1, "321");
assertEQ(resultStr, "321");
// Empty dependencies
expectError(() => {
let task = new taskpoolTask(returnAbc);
task.addDependency();
}, new Error("addDependency has no params."));
expectError(() => {
let task = new taskpoolTask(returnAbc);
task.removeDependency();
}, new Error("removeDependency has no params."));
});
commonTaskSuite.addTest("CircularDependencyTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let task2 = new taskpoolTask(returnAbc);
let task3 = new taskpoolTask(returnAbc);
let task4 = new taskpoolTask(returnAbc);
let task5 = new taskpoolTask(returnAbc);
let task6 = new taskpoolTask(returnAbc);
let expectedError = new Error("There is a circular dependency");
expectNoThrow(() => {
task1.addDependency(task2);
task2.addDependency(task3);
task3.addDependency(task4);
task4.addDependency(task5);
task5.addDependency(task6);
});
expectError(() => {
// circular dependency
task6.addDependency(task1);
}, expectedError);
expectError(() => { task1.addDependency(task1) }, expectedError);
expectError(() => { task1.removeDependency(task3) }, new Error("The dependency does not exist, "));
});
commonTaskSuite.addTest("DependencyExecutedTaskTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let task2 = new taskpoolTask(returnAbc);
expectError(() => { task1.removeDependency(task2) }, new Error("taskpool:: task has no dependency"));
taskpool.execute(task2);
expectError(() => { task1.addDependency(task2) }, new Error("taskpool:: seqRunnerTask or executedTask cannot be relied on"));
expectError(() => { task2.addDependency(task1) }, new Error("taskpool:: seqRunnerTask or executedTask cannot addDependency"));
let task3 = new taskpoolTask(returnAbc);
let task4 = new taskpoolTask(returnAbc);
let expectedAgainExecuteError = new Error("taskpool:: executedTask with dependency cannot execute again");
task1.addDependency(task3);
taskpool.execute(task3);
expectError(() => { task1.removeDependency(task3); }, new Error("taskpool:: cannot removeDependency on a dependent and executed task"));
expectError(() => { task1.removeDependency(task4); }, new Error("taskpool:: task has no dependency"));
taskpool.execute(task1);
expectError(() => { taskpool.execute(task1) }, expectedAgainExecuteError);
expectError(() => { taskpool.execute(task3) }, expectedAgainExecuteError);
// Try to remove dependency after execute
let task5 = new taskpoolTask(returnAbc);
let task6 = new taskpoolTask(returnAbc);
task5.addDependency(task6);
taskpool.execute(task5);
expectError(() => { task5.removeDependency(task6); }, new Error("taskpool:: executedTask cannot removeDependency"));
taskpool.execute(task6);
});
commonTaskSuite.addTest("SendReceiveDataTest", () =>
{
let res = 0;
let recvCallback = (): string => {
res += 10;
return "";
};
let sendDataFunc = (): string => {
taskpoolTask.sendData();
return "abc";
};
let t = new taskpoolTask(sendDataFunc);
t.onReceiveData(recvCallback);
let p1 = taskpool.execute(t);
let p2 = taskpool.execute(t);
let res1 = await p1;
let res2 = await p2;
assertEQ(res1, "abc");
assertEQ(res2, "abc");
assertEQ(res, 20);
let r = await taskpool.execute(t);
assertEQ(r, "abc");
assertEQ(res, 30);
});
return commonTaskSuite.run();
}

View File

@ -19,7 +19,7 @@ function SumInt(value1: int, value2: int): int {
function TaskpoolFuncTestSumInt() {
let result = await taskpool.execute(SumInt, 10, 20);
assert(result == 30);
assertEQ(result, 30);
}
function TaskpoolFuncTestAnd() {
@ -30,7 +30,7 @@ function TaskpoolFuncTestAnd() {
return false;
}
let result = await taskpool.execute(And, true, true);
assert(result == true);
assertEQ(result, true);
}
function TaskpoolFuncTestStrCat() {
@ -38,7 +38,7 @@ function TaskpoolFuncTestStrCat() {
return value1 + value2;
}
let result = await taskpool.execute(StrCat, "abc", "def");
assert(result == "abcdef");
assertEQ(result, "abcdef");
}
function TaskpoolFuncTestStrCat2() {
@ -47,7 +47,7 @@ function TaskpoolFuncTestStrCat2() {
}
let result = await taskpool.execute(StrCat, "abc", "def");
result = await taskpool.execute(StrCat, "abc", "def");
assert(result == "abcdef");
assertEQ(result, "abcdef");
}
function TaskpoolFuncTestStrCat3() {
@ -58,7 +58,7 @@ function TaskpoolFuncTestStrCat3() {
return value1 + StrCat(value2, "hello");
}
let result = await taskpool.execute(Sum, "abc", "def");
assert(result == "abcdefhello");
assertEQ(result, "abcdefhello");
}
function TaskpoolFuncTestArraySum() {
@ -69,8 +69,8 @@ function TaskpoolFuncTestArraySum() {
return result;
}
let result = (await taskpool.execute(Sum, [1, 2], [3, 4])) as int[];
assert(result[0] == 4);
assert(result[1] == 6);
assertEQ(result[0], 4);
assertEQ(result[1], 6);
}
class A {
@ -88,22 +88,22 @@ function TaskpoolFuncTestClassFieldsSum() {
return result;
}
let result = (await taskpool.execute(Sum, new A(1, 2), new A(3, 4))) as A;
assert(result.a == 4);
assert(result.b == 6);
assertEQ(result.a, 4);
assertEQ(result.b, 6);
}
function TaskpoolFuncTestSumInt2() {
let result1 = await taskpool.execute(SumInt, 10, 20);
let result2 = await taskpool.execute(SumInt, 30, 40);
assert(result1 == 30);
assert(result2 == 70);
assertEQ(result1, 30);
assertEQ(result2, 70);
}
function TaskpoolFuncTestSumInt2SameArguments() {
let result1 = await taskpool.execute(SumInt, 10, 20);
let result2 = await taskpool.execute(SumInt, 10, 20);
assert(result1 == 30);
assert(result2 == 30);
assertEQ(result1, 30);
assertEQ(result2, 30);
}
function TaskpoolFuncTestSumAndMulti() {
@ -114,10 +114,10 @@ function TaskpoolFuncTestSumAndMulti() {
let result2 = await taskpool.execute(MultiInt, 10, 20);
let result3 = await taskpool.execute(SumInt, 10, 30);
let result4 = await taskpool.execute(MultiInt, 20, 20);
assert(result1 == 30);
assert(result2 == 200);
assert(result3 == 40);
assert(result4 == 400);
assertEQ(result1, 30);
assertEQ(result2, 200);
assertEQ(result3, 40);
assertEQ(result4, 400);
}
function main(): int {

View File

@ -21,190 +21,152 @@ function returnDef(): string {
return (() => "de")() + (() => "f")();
}
function TaskpoolTaskGroupTest() {
let g = new taskpoolTaskGroup("g1");
assert(g.name == "g1");
let t1 = new taskpoolTask(returnAbc);
let t2 = new taskpoolTask(returnDef);
g.addTask(t1);
g.addTask(t2);
let res = await taskpool.execute(g);
assert(res[0] == "abc");
assert(res[1] == "def");
assert(g.name == "g1");
}
function TaskpoolTaskGroupTestExecuteSeveral() {
let g1 = new taskpoolTaskGroup();
let g2 = new taskpoolTaskGroup();
let g3 = new taskpoolTaskGroup();
// Group 1
let t1 = new taskpoolTask(returnAbc);
let t2 = new taskpoolTask(returnDef);
g1.addTask(t1);
g1.addTask(t2);
g1.addTask(returnAbc);
// Group 2
g2.addTask(returnDef);
g2.addTask(returnAbc);
// Execute group 1
let p1 = taskpool.execute(g1);
// Group 3
g3.addTask(returnAbc);
g3.addTask(new taskpoolTask(returnDef));
// Execute group 3 and group 2
let p3 = taskpool.execute(g3);
let p2 = taskpool.execute(g2);
// await result for all groups
let res1 = await p1;
let res2 = await p2;
let res3 = await p3;
// Asserts for group 1
assert(res1[0] == "abc");
assert(res1[1] == "def");
assert(res1[2] == "abc");
// Asserts for group 2
assert(res2[0] == "def");
assert(res2[1] == "abc");
// Asserts for group 3
assert(res3[0] == "abc");
assert(res3[1] == "def");
}
function TaskpoolTaskGroupTestCancelNonStartedGroup() {
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
let isErrorOccurred = false;
try {
taskpool.cancel(g);
taskpool.execute(g);
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup is not executed or has been executed");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestCancelExecutingGroup() {
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
let p = taskpool.execute(g);
try {
taskpool.cancel(g);
} catch(e: Error) {
assert(e.toString() == "Error: taskpool:: taskGroup is not executed or has been executed");
return;
}
let isErrorOccurred = false;
try {
await p;
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup has been canceled");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestCancelExecutedGroup() {
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
let isErrorOccurred = false;
try {
await taskpool.execute(g);
taskpool.cancel(g);
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup is not executed or has been executed");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestAddExecutedTask() {
let g = new taskpoolTaskGroup();
let t = new taskpoolTask(returnAbc);
taskpool.execute(t);
let isErrorOccurred = false;
try {
g.addTask(t);
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup cannot add seqRunnerTask or executedTask");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestAddLongTask() {
let g = new taskpoolTaskGroup();
let t = new taskpoolLongTask(returnAbc);
let isErrorOccurred = false;
try {
g.addTask(t);
} catch(e: Error) {
assert(e.message == "taskpool:: The interface does not support the long task");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestAddTaskFromAnotherGroup() {
let g1 = new taskpoolTaskGroup();
let t1 = new taskpoolTask(returnAbc);
g1.addTask(t1);
let g2 = new taskpoolTaskGroup();
let t2 = new taskpoolTask(returnDef);
g2.addTask(t2);
let isErrorOccurred = false;
try {
g1.addTask(t2);
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup cannot add groupTask");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestAddTaskTwiceToOneGroup() {
let g1 = new taskpoolTaskGroup();
let t1 = new taskpoolTask(returnAbc);
g1.addTask(t1);
let isErrorOccurred = false;
try {
g1.addTask(t1);
} catch(e: Error) {
assert(e.message == "taskpool:: taskGroup cannot add groupTask");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolTaskGroupTestAddTaskFromSeqRunner() {
let group = new taskpoolTaskGroup();
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
runner.execute(task);
let isErrorOccurred = false;
try {
group.addTask(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: taskGroup cannot add seqRunnerTask or executedTask");
}
assert(isErrorOccurred);
}
function main(): int {
TaskpoolTaskGroupTest();
TaskpoolTaskGroupTestExecuteSeveral();
TaskpoolTaskGroupTestCancelNonStartedGroup();
TaskpoolTaskGroupTestCancelExecutingGroup();
TaskpoolTaskGroupTestCancelExecutedGroup();
TaskpoolTaskGroupTestAddExecutedTask();
TaskpoolTaskGroupTestAddLongTask();
TaskpoolTaskGroupTestAddTaskFromAnotherGroup();
TaskpoolTaskGroupTestAddTaskTwiceToOneGroup();
TaskpoolTaskGroupTestAddTaskFromSeqRunner();
return 0;
let taskGroupTestsuite = new ArkTestsuite("taskpool.TaskGroup");
taskGroupTestsuite.addTest("RunOneGroupTest", () =>
{
let g2 = new taskpoolTaskGroup("g2");
assertEQ(g2.name, "g2");
let t1 = new taskpoolTask(returnAbc);
let t2 = new taskpoolTask(returnDef);
g2.addTask(t1);
g2.addTask(t2);
let res2 = await taskpool.execute(g2);
assertEQ(res2[0], "abc");
assertEQ(res2[1], "def");
assertEQ(g2.name, "g2");
// Run group without tasks
let g0 = new taskpoolTaskGroup("g0");
let res0 = await taskpool.execute(g0);
assertEQ(res0.length, 0);
assertEQ(g0.name, "g0");
});
taskGroupTestsuite.addTest("RunSeveralGroupsTest", () =>
{
let g1 = new taskpoolTaskGroup();
let g2 = new taskpoolTaskGroup();
let g3 = new taskpoolTaskGroup();
// Group 1
let t1 = new taskpoolTask(returnAbc);
let t2 = new taskpoolTask(returnDef);
g1.addTask(t1);
g1.addTask(t2);
g1.addTask(returnAbc);
// Group 2
g2.addTask(returnDef);
g2.addTask(returnAbc);
// Execute group 1
let p1 = taskpool.execute(g1);
// Group 3
g3.addTask(returnAbc);
g3.addTask(new taskpoolTask(returnDef));
// Execute group 3 and group 2
let p3 = taskpool.execute(g3);
let p2 = taskpool.execute(g2);
// await result for all groups
let res1 = await p1;
let res3 = await p3;
let res2 = await p2;
// Asserts for group 1
assertEQ(res1[0], "abc");
assertEQ(res1[1], "def");
assertEQ(res1[2], "abc");
assertEQ(t1.isDone(), true);
assertEQ(t2.isDone(), true);
// Asserts for group 2
assertEQ(res2[0], "def");
assertEQ(res2[1], "abc");
// Asserts for group 3
assertEQ(res3[0], "abc");
assertEQ(res3[1], "def");
});
taskGroupTestsuite.addTest("CancelNonStartedGroupTest", (): void throws =>
{
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
expectError(() => {
taskpool.cancel(g);
taskpool.execute(g);
}, new Error("taskpool:: taskGroup is not executed or has been executed"));
});
taskGroupTestsuite.addTest("CancelExecutingGroupTest", (): void throws =>
{
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
let p = taskpool.execute(g);
try {
taskpool.cancel(g);
} catch(e: Error) {
assertEQ(e.message, "taskpool:: taskGroup is not executed or has been executed");
return;
}
expectError(() => { await p },
new Error("taskpool:: taskGroup has been canceled"));
});
taskGroupTestsuite.addTest("CancelExecutedGroupTest", (): void throws =>
{
let g = new taskpoolTaskGroup();
g.addTask(returnAbc);
g.addTask(returnDef);
expectError(() => {
await taskpool.execute(g);
// Group tasks finished, so 'cancel' should produce the Error
taskpool.cancel(g);
}, new Error("taskpool:: taskGroup is not executed or has been executed"));
});
taskGroupTestsuite.addTest("AddExecutedTaskTest", (): void throws =>
{
let g = new taskpoolTaskGroup();
let expectedError = new Error("taskpool:: taskGroup cannot add seqRunnerTask or executedTask");
// Executed task
let task = new taskpoolTask(returnAbc);
taskpool.execute(task);
expectError(() => { g.addTask(task) }, expectedError);
// Executed task via SequenceRunner
let runnerTask = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
runner.execute(runnerTask);
expectError(() => { g.addTask(runnerTask) }, expectedError);
});
taskGroupTestsuite.addTest("AddLongTaskTest", (): void throws =>
{
let g = new taskpoolTaskGroup();
let t = new taskpoolLongTask(returnAbc);
expectError(() => { g.addTask(t) },
new Error("taskpool:: The interface does not support the long task"));
});
taskGroupTestsuite.addTest("AddTaskFromGroupTest", (): void throws =>
{
let g1 = new taskpoolTaskGroup();
let t1 = new taskpoolTask(returnAbc);
g1.addTask(t1);
let g2 = new taskpoolTaskGroup();
let t2 = new taskpoolTask(returnDef);
g2.addTask(t2);
let expectedError = new Error("taskpool:: taskGroup cannot add groupTask");
// Add the same task
expectError(() => { g1.addTask(t1) }, expectedError);
// Add task from another group
expectError(() => { g1.addTask(t2) }, expectedError);
});
taskGroupTestsuite.addTest("AddDependentTaskTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let task2 = new taskpoolTask(returnAbc);
task1.addDependency(task2);
let taskGroup = new taskpoolTaskGroup();
expectError(() => { taskGroup.addTask(task1) }, new Error("taskpool:: dependent task not allowed."));
});
taskGroupTestsuite.addTest("AddDependencyForGroupTaskTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let taskGroup = new taskpoolTaskGroup();
taskGroup.addTask(task1);
let task2 = new taskpoolTask(returnAbc);
expectError(() => { task1.addDependency(task2) }, new Error("taskpool:: groupTask cannot addDependency"));
expectError(() => { task2.addDependency(task1) }, new Error("taskpool:: groupTask cannot be relied on"));
});
return taskGroupTestsuite.run();
}

View File

@ -13,33 +13,24 @@
* limitations under the License.
*/
function TaskpoolLongTaskTest() {
let Sum = (): int => {
return 10 + 20;
}
let task = new taskpoolLongTask(Sum);
let result = await taskpool.execute(task);
assert(result == 30);
}
function TaskpoolLongTaskTestDoubleExecution() {
let Sum = (): int => {
return 10 + 20;
}
let task = new taskpoolLongTask(Sum);
taskpool.execute(task);
let isErrorOccurred = false;
try {
taskpool.execute(task);
} catch (e: Error) {
assert(e.message == "taskpool:: The long task can only be executed once");
isErrorOccurred = true;
}
assert(isErrorOccurred);
function sumFunc(): int {
return (() => 10)() + (() => 20)();
}
function main(): int {
TaskpoolLongTaskTest();
TaskpoolLongTaskTestDoubleExecution();
return 0;
let longTaskTestsuite = new ArkTestsuite("taskpool.LongTask");
longTaskTestsuite.addTest("ExecuteTest", () =>
{
let task = new taskpoolLongTask(sumFunc);
let result = await taskpool.execute(task);
assertEQ(result, 30);
});
longTaskTestsuite.addTest("DoubleExecutionTest", (): void throws =>
{
let task = new taskpoolLongTask(sumFunc);
taskpool.execute(task);
expectError(() => { taskpool.execute(task) },
new Error("taskpool:: The long task can only be executed once"));
});
return longTaskTestsuite.run();
}

View File

@ -17,135 +17,118 @@ function returnAbc() {
return (() => "a")() + (() => "b")() + (() => "c")();
}
function TaskpoolSequenceRunnerTestOrder() {
let resultStr: string = "";
let func1 = (): string => {
resultStr += "1";
return resultStr;
};
let func2 = (): string => {
resultStr += "2";
return resultStr;
}
let func3 = (): string => {
resultStr += "3";
return resultStr;
}
let runner = new taskpoolSequenceRunner();
let task1 = new taskpoolTask(func1);
let task2 = new taskpoolTask(func2);
let task3 = new taskpoolTask(func3);
let p1 = runner.execute(task1);
let p2 = runner.execute(task2);
let p3 = runner.execute(task3);
let res2 = await p2;
assert(res2 == "12");
let res3 = await p3;
assert(res3 == "123");
let res1 = await p1;
assert(res1 == "1");
assert(resultStr == "123");
}
function TaskpoolSequenceRunnerTestExecuteCommonTask() {
let runner = new taskpoolSequenceRunner();
let task = new taskpoolTask(returnAbc);
taskpool.execute(task);
let isErrorOccurred = false;
try {
runner.execute(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask");
}
assert(isErrorOccurred);
}
function TaskpoolSequenceRunnerTestExecuteSeqRunnerTask() {
let runner1 = new taskpoolSequenceRunner();
let runner2 = new taskpoolSequenceRunner();
let task = new taskpoolTask(returnAbc);
runner1.execute(task);
let isErrorOccurred = false;
try {
runner2.execute(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask");
}
assert(isErrorOccurred);
}
function TaskpoolSequenceRunnerTestExecuteGroupTask() {
let runner = new taskpoolSequenceRunner();
let group = new taskpoolTaskGroup();
let task = new taskpoolTask(returnAbc);
group.addTask(task);
let isErrorOccurred = false;
try {
runner.execute(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: SequenceRunner cannot execute groupTask");
}
assert(isErrorOccurred);
}
function TaskpoolSequenceRunnerTestCancelNonStartedTask() {
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
let isErrorOccurred = false;
try {
taskpool.cancel(task);
runner.execute(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: task is not executed or has been executed");
}
assert(isErrorOccurred);
}
function TaskpoolSequenceRunnerTestCancelExecutingTask() {
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
let p = runner.execute(task);
try {
taskpool.cancel(task);
} catch(e: Error) {
assert(e.message == "taskpool:: sequenceRunner task has been executed");
return;
}
let isErrorOccurred = false;
try {
let res = await p;
} catch(e: Error) {
assert(e.message == "taskpool:: sequenceRunner task has been canceled");
isErrorOccurred = true;
}
assert(isErrorOccurred);
}
function TaskpoolSequenceRunnerTestCancelExecutedTask() {
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
let isErrorOccurred = false;
try {
await runner.execute(task);
taskpool.cancel(task);
} catch(e: Error) {
isErrorOccurred = true;
assert(e.message == "taskpool:: sequenceRunner task has been executed");
}
assert(isErrorOccurred);
}
function main(): int {
TaskpoolSequenceRunnerTestOrder();
TaskpoolSequenceRunnerTestExecuteCommonTask();
TaskpoolSequenceRunnerTestExecuteSeqRunnerTask();
TaskpoolSequenceRunnerTestExecuteGroupTask();
TaskpoolSequenceRunnerTestCancelNonStartedTask();
TaskpoolSequenceRunnerTestCancelExecutingTask();
TaskpoolSequenceRunnerTestCancelExecutedTask();
return 0;
}
let seqRunnerSuite = new ArkTestsuite("taskpool.SequenceRunner");
seqRunnerSuite.addTest("OrderTest", () =>
{
let resultStr: string = "";
let func1 = (): string => {
resultStr += "1";
return resultStr;
};
let func2 = (): string => {
resultStr += "2";
return resultStr;
}
let func3 = (): string => {
resultStr += "3";
return resultStr;
}
let runner = new taskpoolSequenceRunner();
let task1 = new taskpoolTask(func1);
let task2 = new taskpoolTask(func2);
let task3 = new taskpoolTask(func3);
let p1 = runner.execute(task1);
let p2 = runner.execute(task2);
let p3 = runner.execute(task3);
let res2 = await p2;
assertEQ(res2, "12");
let res3 = await p3;
assertEQ(res3, "123");
let res1 = await p1;
assertEQ(res1, "1");
assertEQ(resultStr, "123");
assertEQ(task1.isDone(), true);
assertEQ(task2.isDone(), true);
assertEQ(task3.isDone(), true);
// Execute LongTask
let longTask = new taskpoolLongTask(returnAbc);
let res = await runner.execute(longTask);
assertEQ(res, "abc");
});
seqRunnerSuite.addTest("ExecuteCommonTest", (): void throws =>
{
let runner = new taskpoolSequenceRunner();
let task = new taskpoolTask(returnAbc);
taskpool.execute(task);
expectError(() => { runner.execute(task) },
new Error("taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask"));
});
seqRunnerSuite.addTest("ExecuteSeqRunnerTaskTest", (): void throws =>
{
let runner1 = new taskpoolSequenceRunner();
let runner2 = new taskpoolSequenceRunner();
let task = new taskpoolTask(returnAbc);
runner1.execute(task);
expectError(() => { runner2.execute(task) },
new Error("taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask"));
});
seqRunnerSuite.addTest("ExecuteGroupTaskTest", (): void throws => {
let runner = new taskpoolSequenceRunner();
let group = new taskpoolTaskGroup();
let task = new taskpoolTask(returnAbc);
group.addTask(task);
expectError(() => { runner.execute(task) },
new Error("taskpool:: SequenceRunner cannot execute groupTask"));
});
seqRunnerSuite.addTest("CancelNonStartedTaskTest", (): void throws =>
{
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
expectError(() => {
taskpool.cancel(task);
runner.execute(task);
}, new Error("taskpool:: task is not executed or has been executed"));
});
seqRunnerSuite.addTest("CancelExecutingTaskTest", (): void throws =>
{
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
let p = runner.execute(task);
try {
taskpool.cancel(task);
} catch(e: Error) {
assertEQ(e.message, "taskpool:: sequenceRunner task has been executed");
return;
}
expectError(() => { let res = await p; },
new Error("taskpool:: sequenceRunner task has been canceled"));
});
seqRunnerSuite.addTest("CancelExecutedTaskTest", (): void throws =>
{
let task = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
expectError(() => {
await runner.execute(task);
taskpool.cancel(task);
}, new Error("taskpool:: sequenceRunner task has been executed"));
});
seqRunnerSuite.addTest("AddDependentTaskTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let task2 = new taskpoolTask(returnAbc);
task1.addDependency(task2);
let runner = new taskpoolSequenceRunner();
expectError(() => { runner.execute(task1) }, new Error("seqRunner:: dependent task not allowed."));
});
seqRunnerSuite.addTest("AddDependencyExecutedTaskTest", (): void throws =>
{
let task1 = new taskpoolTask(returnAbc);
let task2 = new taskpoolTask(returnAbc);
let runner = new taskpoolSequenceRunner();
runner.execute(task2);
expectError(() => { task1.addDependency(task2) }, new Error("taskpool:: seqRunnerTask or executedTask cannot be relied on"));
expectError(() => { task2.addDependency(task1) }, new Error("taskpool:: seqRunnerTask or executedTask cannot addDependency"));
});
return seqRunnerSuite.run();
}