event channel

Signed-off-by: blueyouh <bailiang18@huawei.com>
Change-Id: If139e13f9942bf6e7c185e87921c4512e4dc7eac
Signed-off-by: blueyouh <bailiang18@huawei.com>
This commit is contained in:
blueyouh 2023-11-21 09:10:45 +00:00
parent b097dedc1c
commit 9d36b00e18
4 changed files with 380 additions and 0 deletions

View File

@ -48,6 +48,7 @@ group("device_status_tests") {
testonly = true
deps = [
"frameworks/native/interaction/test:interaction_manager_test",
"intention/common/channel:intention_channel_test",
"services/native/test:devicestatussrv_test",
"test/fuzztest:device_status_fuzztest",
]

View File

@ -0,0 +1,54 @@
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/test.gni")
import("../../../device_status.gni")
config("intention_channel_public_config") {
include_dirs = [ "include" ]
}
ohos_source_set("intention_channel") {
sanitize = {
cfi = true
cfi_cross_dso = true
debug = false
}
branch_protector_ret = "pac_ret"
include_dirs = [ "include" ]
sources = []
public_configs = [ ":intention_channel_public_config" ]
subsystem_name = "${device_status_subsystem_name}"
part_name = "${device_status_part_name}"
}
ohos_unittest("intention_channel_test") {
module_out_path = "${device_status_part_name}/devicestatussrv"
sources = [ "test/channel_test.cpp" ]
deps = [
":intention_channel",
"${device_status_utils_path}:devicestatus_util",
]
external_deps = [
"c_utils:utils",
"hilog:libhilog",
]
}

View File

@ -0,0 +1,228 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CHANNEL_H
#define CHANNEL_H
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <type_traits>
namespace OHOS {
namespace Msdp {
namespace DeviceStatus {
template<typename Event>
class Channel {
static_assert(std::is_enum_v<Event> || std::is_integral_v<Event> ||
(std::is_class_v<Event> &&
std::is_default_constructible_v<Event> &&
std::is_copy_constructible_v<Event>));
public:
class Sender final {
friend class Channel<Event>;
public:
Sender() = default;
~Sender() = default;
Sender(const Sender &other)
: channel_(other.channel_)
{}
Sender(Sender &&other)
: channel_(other.channel_)
{
other.channel_ = nullptr;
}
Sender& operator=(const Sender &other)
{
channel_ = other.channel_;
return *this;
}
Sender& operator=(Sender &&other)
{
channel_ = other.channel_;
other.channel_ = nullptr;
return *this;
}
void Send(const Event &event)
{
if (channel_ != nullptr) {
channel_->Send(event);
}
}
private:
Sender(std::shared_ptr<Channel<Event>> channel)
: channel_(channel)
{}
std::shared_ptr<Channel<Event>> channel_ { nullptr };
};
class Receiver final {
friend class Channel<Event>;
public:
Receiver() = default;
~Receiver() = default;
Receiver(const Receiver &other)
: channel_(other.channel_)
{}
Receiver(Receiver &&other)
: channel_(other.channel_)
{
other.channel_ = nullptr;
}
Receiver& operator=(const Receiver &other)
{
channel_ = other.channel_;
return *this;
}
Receiver& operator=(Receiver &&other)
{
channel_ = other.channel_;
other.channel_ = nullptr;
return *this;
}
Event Peek()
{
return (channel_ != nullptr ? channel_->Peek() : Event());
}
void Pop()
{
if (channel_ != nullptr) {
channel_->Pop();
}
}
Event Receive()
{
return (channel_ != nullptr ? channel_->Receive() : Event());
}
private:
Receiver(std::shared_ptr<Channel<Event>> channel)
: channel_(channel)
{}
std::shared_ptr<Channel<Event>> channel_ { nullptr };
};
~Channel() = default;
static std::pair<Sender, Receiver> OpenChannel();
private:
Channel() = default;
void Send(const Event &event);
Event Peek();
void Pop();
Event Receive();
static inline constexpr size_t QUEUE_CAPACITY { 1024 };
std::mutex lock_;
std::condition_variable full_;
std::condition_variable empty_;
std::deque<Event> queue_;
};
template<typename Event>
std::pair<typename Channel<Event>::Sender, typename Channel<Event>::Receiver> Channel<Event>::OpenChannel()
{
std::shared_ptr<Channel<Event>> channel(new Channel<Event>());
return std::make_pair(Channel<Event>::Sender(channel), Channel<Event>::Receiver(channel));
}
template<typename Event>
void Channel<Event>::Send(const Event &event)
{
std::unique_lock<std::mutex> lock(lock_);
if (queue_.size() >= QUEUE_CAPACITY) {
full_.wait(lock, [this] {
return (queue_.size() < QUEUE_CAPACITY);
});
}
bool needNotify = queue_.empty();
queue_.push_back(event);
if (needNotify) {
empty_.notify_one();
}
}
template<typename Event>
Event Channel<Event>::Peek()
{
std::unique_lock<std::mutex> lock(lock_);
if (queue_.empty()) {
empty_.wait(lock, [this] {
return !queue_.empty();
});
}
return queue_.front();
}
template<typename Event>
void Channel<Event>::Pop()
{
std::unique_lock<std::mutex> lock(lock_);
if (queue_.empty()) {
empty_.wait(lock, [this] {
return !queue_.empty();
});
}
bool needNotify(queue_.size() >= QUEUE_CAPACITY);
queue_.pop_front();
if (needNotify) {
full_.notify_one();
}
}
template<typename Event>
Event Channel<Event>::Receive()
{
std::unique_lock<std::mutex> lock(lock_);
if (queue_.empty()) {
empty_.wait(lock, [this] {
return !queue_.empty();
});
}
bool needNotify(queue_.size() >= QUEUE_CAPACITY);
Event event = queue_.front();
queue_.pop_front();
if (needNotify) {
full_.notify_one();
}
return event;
}
} // namespace DeviceStatus
} // namespace Msdp
} // namespace OHOS
#endif // CHANNEL_H

View File

@ -0,0 +1,97 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define private public
#define protected public
#include <gtest/gtest.h>
#include "channel.h"
#include "fi_log.h"
namespace OHOS {
namespace Msdp {
namespace DeviceStatus {
using namespace testing::ext;
namespace {
constexpr ::OHOS::HiviewDFX::HiLogLabel LABEL { LOG_CORE, MSDP_DOMAIN_ID, "ChannelTest" };
} // namespace
class ChannelTest : public testing::Test {
public:
static void SetUpTestCase() {}
static void TearDownTestCase() {}
void SetUp() {}
void TearDown() {}
};
/**
* @tc.name: ChannelTest001
* @tc.desc: test channel throuthput when sending speed is greater than receiving speed.
* @tc.type: FUNC
*/
HWTEST_F(ChannelTest, ChannelTest001, TestSize.Level0)
{
CALL_TEST_DEBUG;
auto [sender, receiver] = Channel<int32_t>::OpenChannel();
constexpr int32_t count = 65535;
std::thread worker([sender = sender, count]() mutable {
for (int32_t index = 0; index < count; ++index) {
sender.Send(index);
}
});
for (int32_t expected = 0; expected < count;) {
int32_t received = receiver.Receive();
ASSERT_EQ(received, expected);
if ((++expected % 10) == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
if (worker.joinable()) {
worker.join();
}
}
/**
* @tc.name: ChannelTest002
* @tc.desc: test channel throuthput when sending speed is less than receiving speed.
* @tc.type: FUNC
*/
HWTEST_F(ChannelTest, ChannelTest002, TestSize.Level0)
{
CALL_TEST_DEBUG;
auto [sender, receiver] = Channel<int32_t>::OpenChannel();
constexpr int32_t count = 65535;
std::thread worker([sender = sender, count]() mutable {
for (int32_t index = 0; index < count;) {
sender.Send(index);
if ((++index % 10) == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
for (int32_t expected = 0; expected < count; ++expected) {
int32_t received = receiver.Receive();
ASSERT_EQ(received, expected);
}
if (worker.joinable()) {
worker.join();
}
}
} // namespace DeviceStatus
} // namespace Msdp
} // namespace OHOS