sensor不需要epoll机制

Signed-off-by: baiwei <baiwei39@huawei.com>
Change-Id: Ib586c977bac735bf1f90d71b1c35d08959534395
This commit is contained in:
baiwei 2023-07-11 06:43:22 +00:00
commit f0bcefafa4
28 changed files with 2203 additions and 163 deletions

View File

@ -20,6 +20,8 @@
"deps": {
"components": [
"c_utils",
"hiviewdfx_hilog_native",
"common",
"hisysevent",
"napi",
"drivers_interface_sensor",

View File

@ -33,11 +33,18 @@ ohos_shared_library("libsensor_native") {
"$SUBSYSTEM_DIR/utils/ipc/include",
]
defines = sensor_default_defines
deps = [
"$SUBSYSTEM_DIR/utils/common:libsensor_utils",
"$SUBSYSTEM_DIR/utils/ipc:libsensor_ipc",
]
if (rust_socket_ipc) {
deps +=
[ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ]
}
external_deps = [
"c_utils:utils",
"eventhandler:libeventhandler",

View File

@ -53,13 +53,13 @@ public:
int32_t ResetSensors();
void ReceiveMessage(const char *buf, size_t size);
void Disconnect();
void HandleNetPacke(NetPacket &pkt);
private:
int32_t InitServiceClient();
void UpdateSensorInfoMap(int32_t sensorId, int64_t samplingPeriod, int64_t maxReportDelay);
void DeleteSensorInfoItem(int32_t sensorId);
int32_t CreateSocketChannel();
void HandleNetPacke(NetPacket &pkt);
std::mutex clientMutex_;
sptr<IRemoteObject::DeathRecipient> serviceDeathObserver_;
sptr<ISensorService> sensorServer_;

View File

@ -27,6 +27,7 @@
#include "sensor_service_proxy.h"
#include "sensors_errors.h"
#include "system_ability_definition.h"
#include "rust_binding.h"
namespace OHOS {
namespace Sensors {
@ -36,6 +37,18 @@ namespace {
constexpr HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "SensorServiceClient" };
constexpr int32_t GET_SERVICE_MAX_COUNT = 30;
constexpr uint32_t WAIT_MS = 200;
#ifdef OHOS_BUILD_ENABLE_RUST
extern "C" {
void ReadClientPackets(RustStreamBuffer*, OHOS::Sensors::SensorServiceClient*,
void(*)(OHOS::Sensors::SensorServiceClient*, RustNetPacket*));
void OnPacket(SensorServiceClient* object, RustNetPacket* cPkt)
{
NetPacket pkt(cPkt->msgId);
pkt.streamBufferPtr_.reset(cPkt->streamBuffer);
object->HandleNetPacke(pkt);
}
}
#endif // OHOS_BUILD_ENABLE_RUST
} // namespace
SensorServiceClient::~SensorServiceClient()
@ -357,7 +370,11 @@ void SensorServiceClient::ReceiveMessage(const char *buf, size_t size)
if (!circBuf_.Write(buf, size)) {
SEN_HILOGE("Write data failed. size:%{public}zu", size);
}
#ifdef OHOS_BUILD_ENABLE_RUST
ReadClientPackets(circBuf_.streamBufferPtr_.get(), this, OnPacket);
#else
OnReadPackets(circBuf_, std::bind(&SensorServiceClient::HandleNetPacke, this, std::placeholders::_1));
#endif // OHOS_BUILD_ENABLE_RUST
}
void SensorServiceClient::HandleNetPacke(NetPacket &pkt)
@ -370,7 +387,11 @@ void SensorServiceClient::HandleNetPacke(NetPacket &pkt)
SensorActiveInfo sensorActiveInfo;
pkt >> sensorActiveInfo.pid >> sensorActiveInfo.sensorId >> sensorActiveInfo.samplingPeriodNs >>
sensorActiveInfo.maxReportDelayNs;
#ifdef OHOS_BUILD_ENABLE_RUST
if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) {
#else
if (pkt.ChkRWError()) {
#endif // OHOS_BUILD_ENABLE_RUST
SEN_HILOGE("Packet read type failed");
return;
}
@ -385,13 +406,14 @@ void SensorServiceClient::HandleNetPacke(NetPacket &pkt)
void SensorServiceClient::Disconnect()
{
CALL_LOG_ENTER;
if (fd_ < 0) {
int32_t fd = GetFd();
if (fd < 0) {
return;
}
CHKPV(dataChannel_);
int32_t ret = dataChannel_->DelFdListener(fd_);
int32_t ret = dataChannel_->DelFdListener(fd);
if (ret != ERR_OK) {
SEN_HILOGE("Delete fd listener failed, fd:%{public}d, ret:%{public}d", fd_, ret);
SEN_HILOGE("Delete fd listener failed, fd:%{public}d, ret:%{public}d", fd, ret);
}
Close();
}
@ -414,12 +436,16 @@ int32_t SensorServiceClient::CreateSocketChannel()
SEN_HILOGE("Create socket channel failed, ret:%{public}d", ret);
return ret;
}
#ifdef OHOS_BUILD_ENABLE_RUST
StreamSocketSetFd(streamSocketPtr_.get(), clientFd);
#else
fd_ = clientFd;
if (dataChannel_->AddFdListener(fd_,
#endif // OHOS_BUILD_ENABLE_RUST
if (dataChannel_->AddFdListener(GetFd(),
std::bind(&SensorServiceClient::ReceiveMessage, this, std::placeholders::_1, std::placeholders::_2),
std::bind(&SensorServiceClient::Disconnect, this)) != ERR_OK) {
Close();
SEN_HILOGE("Add fd listener failed, fd:%{public}d", fd_);
SEN_HILOGE("Add fd listener failed, fd:%{public}d", GetFd());
return ERROR;
}
StartTrace(HITRACE_TAG_SENSORS, "EnableActiveInfoCB");

View File

@ -0,0 +1,31 @@
# Copyright (C) 2022 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/ohos.gni")
ohos_rust_shared_ffi("sensor_rust_util_ffi") {
sources = [ "src/lib.rs" ]
external_deps = [
"c_utils:utils",
"hiviewdfx_hilog_native:hilog_rust",
"hiviewdfx_hilog_native:libhilog",
]
crate_name = "sensor_rust_util_ffi"
crate_type = "cdylib"
install_images = [ system_base_dir ]
part_name = "sensor"
subsystem_name = "sensors"
}

View File

@ -0,0 +1,83 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// provide C interface to C++ for calling
pub mod ffi;
use hilog_rust::{info, error, hilog, HiLogLabel, LogType};
use std::ffi::{CString, c_char};
use libc::c_int;
const ONCE_PROCESS_NETPACKET_LIMIT: i32 = 100;
const MAX_PACKET_BUF_SIZE: usize = 256;
const SEND_RETRY_SLEEP_TIME: u64 = 10000;
const SEND_RETRY_LIMIT: i32 = 32;
const RET_ERR: i32 = -1;
const RET_OK: i32 = 0;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "EpollManager"
};
/// struct EpollManager
#[repr(C)]
pub struct EpollManager {
/// socket_fd
pub socket_fd: i32,
/// epoll_fd
pub epoll_fd: i32,
}
impl Default for EpollManager {
fn default() -> Self {
Self {
socket_fd: -1,
epoll_fd: -1,
}
}
}
impl EpollManager {
fn as_ref<'a>(object: *const Self) -> Option<&'a Self> {
// SAFETY: as_ref has already done no-null verification inside
unsafe {
object.as_ref()
}
}
fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self> {
// SAFETY: as_mut has already done no-null verification inside
unsafe {
object.as_mut()
}
}
fn socket_fd(&self) -> i32 {
self.socket_fd
}
fn socket_set_fd(&mut self, fd: i32) {
self.socket_fd = fd
}
fn socket_close(&mut self) {
if self.socket_fd >= RET_OK {
// safety: call unsafe function
let result = unsafe {
libc::close(self.socket_fd as c_int)
};
if result > RET_OK {
error!(LOG_LABEL, "Socket close failed result:{}", result);
}
self.socket_fd = RET_ERR;
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use hilog_rust::{hilog, HiLogLabel, LogType};
use crate::{error::SocketStatusCode, epoll_manager::EpollManager};
use std::mem::drop;
use std::ffi::c_char;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "stream_socket_ffi"
};
/// Create unique_ptr of StreamSocket for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSocketCreate() -> *mut EpollManager {
let epoll_manager: Box::<EpollManager> = Box::default();
Box::into_raw(epoll_manager)
}
/// Drop unique_ptr of StreamSocket for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSocketDelete(raw: *mut EpollManager) {
if !raw.is_null() {
drop(Box::from_raw(raw));
}
}
/// Obtain StreamSocket's fd
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSocketGetFd(object: *const EpollManager) -> i32 {
info!(LOG_LABEL, "enter StreamSocketGetFd");
if let Some(obj) = EpollManager::as_ref(object) {
obj.socket_fd()
} else {
SocketStatusCode::FdFail.into()
}
}
/// Close socket fd after Sending data.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSocketClose(object: *mut EpollManager) -> i32 {
info!(LOG_LABEL, "enter StreamSocketClose");
if let Some(obj) = EpollManager::as_mut(object) {
obj.socket_close();
SocketStatusCode::Ok.into()
} else {
SocketStatusCode::SocketCloseFail.into()
}
}
/// Set socket fd
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSocketSetFd(object: *mut EpollManager, fd: i32) -> i32 {
info!(LOG_LABEL, "enter StreamSocketSetFd");
if let Some(obj) = EpollManager::as_mut(object) {
obj.socket_set_fd(fd);
SocketStatusCode::Ok.into()
} else {
SocketStatusCode::SocketSetFdFail.into()
}
}

View File

@ -0,0 +1,144 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub enum SocketStatusCode {
Ok = 0,
Fail = -1,
FdFail = -2,
EpollFdFail = -3,
EpollCreateFail = -4,
EpollCtlFail = -5,
EpollWaitFail = -6,
EpollCloseFail = -7,
SocketCloseFail = -8,
SocketSetFdFail = -9,
}
pub enum BufferStatusCode {
Ok = 0,
Fail = -1,
ResetFail = -2,
CleanFail = -3,
UnreadSizeFail = -4,
IsEmptyFail = -5,
WriteStreamBufferFail = -6,
ReadStreamBufferFail = -7,
CheckRWErrorFail = -8,
CopyDataToBeginFail = -9,
ReadCharUsizeFail = -10,
ReadServerPacketsFail = -11,
ReadClientPacketsFail = -12,
SizeFail = -13,
RcountFail = -14,
WcountFail = -15,
WposFail = -16,
RposFail = -17,
SetRwErrStatusFail = -18,
SetRposFail = -19,
}
pub enum SessionStatusCode {
Ok = 0,
Fail = -1,
UidFail = -2,
PidFail = -3,
ModuleTypeFail = -4,
FdFail = -5,
SetTokenTypeFail = -6,
TokenTypeFail = -7,
CloseFail = -8,
SetUidFail = -9,
SetFdFail = -10,
SetPidFail = -11,
}
pub enum NetPacketStatusCode {
Ok = 0,
Fail = -1,
PacketLengthFail = -2,
}
impl From<SocketStatusCode> for i32 {
fn from(code: SocketStatusCode) -> i32 {
match code {
SocketStatusCode::Ok => 0,
SocketStatusCode::FdFail => -2,
SocketStatusCode::EpollFdFail => -3,
SocketStatusCode::EpollCreateFail => -4,
SocketStatusCode::EpollCtlFail => -5,
SocketStatusCode::EpollWaitFail => -6,
SocketStatusCode::EpollCloseFail => -7,
SocketStatusCode::SocketCloseFail => -8,
SocketStatusCode::SocketSetFdFail => -9,
_ => -1,
}
}
}
impl From<BufferStatusCode> for i32 {
fn from(code: BufferStatusCode) -> i32 {
match code {
BufferStatusCode::Ok => 0,
BufferStatusCode::ResetFail => -2,
BufferStatusCode::CleanFail => -3,
BufferStatusCode::UnreadSizeFail => -4,
BufferStatusCode::IsEmptyFail => -5,
BufferStatusCode::WriteStreamBufferFail => -6,
BufferStatusCode::ReadStreamBufferFail => -7,
BufferStatusCode::CheckRWErrorFail => -8,
BufferStatusCode::CopyDataToBeginFail => -9,
BufferStatusCode::ReadCharUsizeFail => -10,
BufferStatusCode::ReadServerPacketsFail => -11,
BufferStatusCode::ReadClientPacketsFail => -12,
BufferStatusCode::SizeFail => -13,
BufferStatusCode::RcountFail => -14,
BufferStatusCode::WcountFail => -15,
BufferStatusCode::WposFail => -16,
BufferStatusCode::RposFail => -17,
BufferStatusCode::SetRwErrStatusFail => -18,
BufferStatusCode::SetRposFail => -19,
_ => -1,
}
}
}
impl From<SessionStatusCode> for i32 {
fn from(code: SessionStatusCode) -> i32 {
match code {
SessionStatusCode::Ok => 0,
SessionStatusCode::UidFail => -2,
SessionStatusCode::PidFail => -3,
SessionStatusCode::ModuleTypeFail => -4,
SessionStatusCode::FdFail => -5,
SessionStatusCode::SetTokenTypeFail => -6,
SessionStatusCode::TokenTypeFail => -7,
SessionStatusCode::CloseFail => -8,
SessionStatusCode::SetUidFail => -9,
SessionStatusCode::SetFdFail => -10,
SessionStatusCode::SetPidFail => -11,
_ => -1,
}
}
}
impl From<NetPacketStatusCode> for i32 {
fn from(code: NetPacketStatusCode) -> i32 {
match code {
NetPacketStatusCode::Ok => 0,
NetPacketStatusCode::PacketLengthFail => -2,
_ => -1,
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//! Safe Rust interface to OHOS msdp
#![feature(rustc_private)]
#![allow(dead_code)]
extern crate libc;
mod epoll_manager;
mod stream_buffer;
mod stream_session;
mod error;
/// annotation
pub type Result<T> = std::result::Result<T, i32>;

View File

@ -0,0 +1,361 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// provide C interface to C++ for calling
pub mod ffi;
pub(super) mod net_packet;
mod binding;
use hilog_rust::{error, hilog, debug, HiLogLabel, LogType};
use std::ffi::{CString, c_char};
use std::mem::size_of;
use binding::CSensorServiceClient;
use net_packet::{NetPacket, CNetPacket, PackHead};
type ErrorStatus = crate::stream_buffer::ErrStatus;
/// function pointer alias
pub type ClientPacketCallBackFun = unsafe extern "C" fn (
client: *const CSensorServiceClient,
pkt: *const CNetPacket,
);
const ONCE_PROCESS_NETPACKET_LIMIT: i32 = 100;
const MAX_STREAM_BUF_SIZE: usize = 256;
/// max buffer size of packet
pub const MAX_PACKET_BUF_SIZE: usize = 256;
const PARAM_INPUT_INVALID: i32 = 5;
const MEM_OUT_OF_BOUNDS: i32 = 3;
const MEMCPY_SEC_FUN_FAIL: i32 = 4;
const STREAM_BUF_READ_FAIL: i32 = 1;
const MAX_VECTOR_SIZE: i32 = 10;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "StreamBuffer"
};
/// enum errstatus
#[derive(Copy, Clone, PartialEq)]
#[repr(C)]
pub enum ErrStatus {
/// status ok
Ok = 0,
/// readerror
Read = 1,
/// writeerror
Write = 2,
}
#[derive(Copy, Clone)]
#[repr(C)]
pub struct StreamBuffer {
rw_error_status: ErrorStatus,
r_count: usize,
w_count: usize,
r_pos: usize,
w_pos: usize,
sz_buff: [c_char; MAX_STREAM_BUF_SIZE + 1],
}
impl Default for StreamBuffer {
fn default() -> Self {
Self {
rw_error_status: ErrorStatus::Ok,
r_count: 0,
w_count: 0,
r_pos: 0,
w_pos: 0,
sz_buff: [0; MAX_STREAM_BUF_SIZE + 1],
}
}
}
impl StreamBuffer {
fn as_ref<'a>(object: *const Self) -> Option<&'a Self> {
// SAFETY: as_ref has already done no-null verification inside
unsafe {
object.as_ref()
}
}
fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self> {
// SAFETY: as_mut has already done no-null verification inside
unsafe {
object.as_mut()
}
}
fn write<T>(&mut self, data: T) {
let data: *const c_char = &data as *const T as *const c_char;
let size = size_of::<T>();
self.write_char_usize(data, size);
}
fn reset(&mut self) {
self.r_pos = 0;
self.w_pos = 0;
self.r_count = 0;
self.w_count = 0;
self.rw_error_status = ErrorStatus::Ok;
}
fn clean(&mut self) {
self.reset();
let size = MAX_STREAM_BUF_SIZE + 1;
let reference = &(self.sz_buff);
let pointer = reference as *const c_char;
// SAFETY: memset_s is the security function of the C library
let ret = unsafe {
binding::memset_s(pointer as *mut libc::c_void, size, 0, size)
};
if ret != 0 {
error!(LOG_LABEL, "Call memset_s fail");
}
}
fn seek_read_pos(&mut self, n: usize) -> bool {
let pos: usize = self.r_pos + n;
if pos > self.w_pos {
error!(LOG_LABEL, "The position in the calculation is not as expected. pos:{} [0, {}]",
pos, self.w_pos);
false
} else {
self.r_pos = pos;
true
}
}
fn unread_size(&self) -> usize {
if self.w_pos <= self.r_pos {
0
} else {
self.w_pos - self.r_pos
}
}
fn is_empty(&self) -> bool {
self.r_pos == self.w_pos
}
fn write_streambuffer(&mut self, buf: &Self) -> bool {
self.write_char_usize(buf.data(), buf.size())
}
fn read_streambuffer(&self, buf: &mut Self) -> bool {
buf.write_char_usize(self.data(), self.size())
}
pub(crate) fn data(&self) -> *const c_char {
&(self.sz_buff[0]) as *const c_char
}
pub(crate) fn size(&self) -> usize {
self.w_pos
}
pub(crate) fn chk_rwerror(&self) -> bool {
self.rw_error_status != ErrorStatus::Ok
}
fn get_available_buf_size(&self) -> usize {
if self.w_pos >= MAX_STREAM_BUF_SIZE {
0
} else {
MAX_STREAM_BUF_SIZE - self.w_pos
}
}
fn get_error_status_remark(&self) -> *const c_char {
let s: &[c_char] = match self.rw_error_status {
ErrorStatus::Ok => b"OK\0",
ErrorStatus::Read => b"READ_ERROR\0",
ErrorStatus::Write => b"WRITE_ERROR\0",
};
s.as_ptr()
}
fn read_buf(&self) -> *const c_char {
&(self.sz_buff[self.r_pos]) as *const c_char
}
fn write_char_usize(&mut self, buf: *const c_char, size: usize) -> bool {
if self.chk_rwerror() {
return false;
}
if buf.is_null() {
error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID);
self.rw_error_status = ErrorStatus::Write;
return false;
}
if size == 0 {
error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID);
self.rw_error_status = ErrorStatus::Write;
return false;
}
if (self.w_pos + size) > MAX_STREAM_BUF_SIZE {
error!(LOG_LABEL, "The write length exceeds buffer. wIdx:{} size:{} maxBufSize:{} errCode:{}",
self.w_pos, size, MAX_STREAM_BUF_SIZE, MEM_OUT_OF_BOUNDS);
self.rw_error_status = ErrorStatus::Write;
return false;
}
let pointer = &(self.sz_buff[0]) as *const c_char;
// SAFETY: memcpy_s is the security function of the C library
let ret = unsafe {
binding::memcpy_s(pointer.add(self.w_pos) as *mut libc::c_void, self.get_available_buf_size(),
buf as *mut libc::c_void, size)
};
if ret != 0 {
error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret);
self.rw_error_status = ErrorStatus::Write;
return false;
}
self.w_pos += size;
self.w_count += 1;
true
}
fn check_write(&mut self, size: usize) -> bool {
let buffer_size = size;
let mut avail_size = self.get_available_buf_size();
if buffer_size > avail_size && self.r_pos > 0 {
self.copy_data_to_begin();
avail_size = self.get_available_buf_size();
}
avail_size >= buffer_size
}
fn copy_data_to_begin(&mut self) {
let unread_size = self.unread_size();
if unread_size > 0 && self.r_pos > 0 {
for (index, value) in (self.r_pos..=self.w_pos).enumerate() {
self.sz_buff[index] = self.sz_buff[value];
}
}
debug!(LOG_LABEL, "unread_size:{} rPos:{} wPos:{}", unread_size, self.r_pos, self.w_pos);
self.r_pos = 0;
self.w_pos = unread_size;
}
fn read_char_usize(&mut self, buf: *const c_char, size: usize) -> bool {
if self.chk_rwerror() {
return false;
}
if buf.is_null() {
error!(LOG_LABEL, "Invalid input parameter buf=nullptr errCode:{}", PARAM_INPUT_INVALID);
self.rw_error_status = ErrorStatus::Read;
return false;
}
if size == 0 {
error!(LOG_LABEL, "Invalid input parameter size={} errCode:{}", size, PARAM_INPUT_INVALID);
self.rw_error_status = ErrorStatus::Read;
return false;
}
if (self.r_pos + size) > self.w_pos {
error!(LOG_LABEL, "Memory out of bounds on read... errCode:{}", MEM_OUT_OF_BOUNDS);
self.rw_error_status = ErrorStatus::Read;
return false;
}
// SAFETY: memcpy_s is the security function of the C library
let ret = unsafe {
binding::memcpy_s(buf as *mut libc::c_void, size, self.read_buf() as *const libc::c_void, size)
};
if ret != 0 {
error!(LOG_LABEL, "Failed to call memcpy_s. ret:{}", ret);
self.rw_error_status = ErrorStatus::Read;
return false;
}
self.r_pos += size;
self.r_count += 1;
true
}
fn circle_write(&mut self, buf: *const c_char, size: usize) -> bool {
if !self.check_write(size) {
error!(LOG_LABEL, "Out of buffer memory, availableSize:{}, size:{}, unreadSize:{}, rPos:{}, wPos:{}",
self.get_available_buf_size(), size, self.unread_size(),
self.r_pos, self.w_pos);
return false;
}
self.write_char_usize(buf, size)
}
pub unsafe fn read_client_packets(&mut self, client: *const CSensorServiceClient, callback_fun: ClientPacketCallBackFun) {
const HEAD_SIZE: usize = size_of::<PackHead>();
for _i in 0..ONCE_PROCESS_NETPACKET_LIMIT {
let unread_size = self.unread_size();
if unread_size < HEAD_SIZE {
break;
}
let data_size = unread_size - HEAD_SIZE;
let buf: *const c_char = self.read_buf();
if buf.is_null() {
error!(LOG_LABEL, "buf is null, skip then break");
break;
}
let head: *const PackHead = buf as *const PackHead;
if head.is_null() {
error!(LOG_LABEL, "head is null, skip then break");
break;
}
// SAFETY: head pointer should be not null certainly
let size = unsafe {
(*head).size
};
// SAFETY: head pointer should be not null certainly
let id_msg = unsafe {
(*head).id_msg
};
if !(0..=MAX_PACKET_BUF_SIZE).contains(&size) {
error!(LOG_LABEL, "Packet header parsing error, and this error cannot be recovered. \
The buffer will be reset. size:{}, unreadSize:{}", size, unread_size);
self.reset();
break;
}
if size > data_size {
break;
}
let mut pkt: NetPacket = NetPacket {
msg_id: id_msg,
..Default::default()
};
unsafe {
if size > 0 &&
!pkt.stream_buffer.write_char_usize(buf.add(HEAD_SIZE) as *const c_char, size) {
error!(LOG_LABEL, "Error writing data in the NetPacket. It will be retried next time. \
messageid:{}, size:{}", id_msg as i32, size);
break;
}
}
if !self.seek_read_pos(pkt.get_packet_length()) {
error!(LOG_LABEL, "Set read position error, and this error cannot be recovered, and the buffer \
will be reset. packetSize:{} unreadSize:{}", pkt.get_packet_length(), unread_size);
self.reset();
break;
}
let c_net_packet: CNetPacket = CNetPacket {
msg_id: pkt.msg_id,
stream_buffer_ptr: Box::into_raw(Box::new(pkt.stream_buffer))
};
unsafe {
callback_fun(client, &c_net_packet as *const CNetPacket);
}
if self.is_empty() {
self.reset();
break;
}
}
}
fn r_count(&self) -> usize {
self.r_count
}
fn w_count(&self) -> usize {
self.w_count
}
fn w_pos(&self) -> usize {
self.w_pos
}
fn r_pos(&self) -> usize {
self.r_pos
}
fn sz_buff(&self) -> *const c_char {
&self.sz_buff[0] as *const c_char
}
fn set_rw_error_status(&mut self, rw_error_status: ErrorStatus) {
self.rw_error_status = rw_error_status
}
fn set_r_pos(&mut self, r_pos: usize) {
self.r_pos = r_pos
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// C interface for socket core object
use libc::c_void;
/// C StreamServer struct pointer
#[repr(C)]
pub struct CStreamServer {
_private: [u8; 0],
}
/// C Client struct pointer
#[repr(C)]
pub struct CSensorServiceClient {
_private: [u8; 0],
}
extern "C" {
/// extern safe C function
pub fn memcpy_s(dest: *mut c_void, dest_size: libc::size_t, src: *const c_void, count: libc::size_t) -> i32;
/// extern safe C function
pub fn memset_s(dest: *mut c_void, dest_size: libc::size_t, ch: libc::c_int, count: libc::size_t) -> i32;
}

View File

@ -0,0 +1,389 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use hilog_rust::{info, hilog, HiLogLabel, LogType};
use crate::error::BufferStatusCode;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "stream_buffer_ffi"
};
/// Create unique_ptr of stream_buffer for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferCreate() -> *mut StreamBuffer {
let stream_buffer: Box::<StreamBuffer> = Box::default();
Box::into_raw(stream_buffer)
}
/// Drop unique_ptr of stream_buffer for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferDelete(raw: *mut StreamBuffer) {
if !raw.is_null() {
drop(Box::from_raw(raw));
}
}
/// Obtain the first address of sz_buff
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferData(object: *const StreamBuffer) -> *const c_char {
info!(LOG_LABEL, "enter data");
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.data()
} else {
std::ptr::null()
}
}
/// Obtain position writen
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferSize(object: *const StreamBuffer) -> usize {
info!(LOG_LABEL, "enter size");
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.size()
} else {
0
}
}
/// Reset StreamBuffer value
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferReset(object: *mut StreamBuffer) -> i32 {
info!(LOG_LABEL, "enter StreamBufferReset");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.reset();
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::ResetFail.into()
}
}
/// Clean StreamBuffer value
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferClean(object: *mut StreamBuffer) -> i32 {
info!(LOG_LABEL, "enter clean");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.clean();
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::CleanFail.into()
}
}
/// Write object data into buf
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferWrite(object: *mut StreamBuffer, buf: *const StreamBuffer) -> bool {
info!(LOG_LABEL, "enter StreamBufferWrite");
if let Some(obj) = StreamBuffer::as_mut(object) {
if let Some(buffer) = StreamBuffer::as_ref(buf) {
obj.write_streambuffer(buffer)
} else {
false
}
} else {
false
}
}
/// Read object data into buf
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferRead(object: *const StreamBuffer, buf: *mut StreamBuffer) -> bool {
info!(LOG_LABEL, "enter StreamBufferRead");
if let Some(obj) = StreamBuffer::as_ref(object) {
if let Some(buffer) = StreamBuffer::as_mut(buf) {
obj.read_streambuffer(buffer)
} else {
false
}
} else {
false
}
}
/// Obtain status of reading or writing
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferChkRWError(object: *const StreamBuffer) -> bool {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.chk_rwerror()
} else {
false
}
}
/// Obtain remarked string of status
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetErrorStatusRemark(object: *const StreamBuffer) -> *const c_char {
info!(LOG_LABEL, "enter StreamBufferGetErrorStatusRemark");
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.get_error_status_remark()
} else {
std::ptr::null()
}
}
/// Buf Bytes will be writen into streambuffer's sz_buff.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferWriteChar(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool {
info!(LOG_LABEL, "enter StreamBufferWriteChar");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.write_char_usize(buf, size)
} else {
false
}
}
/// Check whether the condition of writing could be satisfied or not.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferCheckWrite(object: *mut StreamBuffer, size: usize) -> bool {
info!(LOG_LABEL, "enter StreamBufferCheckWrite");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.check_write(size)
} else {
false
}
}
/// CircleStreamBuffer will copy data to beginning.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn CircleStreamBufferCopyDataToBegin(object: *mut StreamBuffer) -> i32 {
info!(LOG_LABEL, "enter CircleStreamBufferCopyDataToBegin");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.copy_data_to_begin();
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::CopyDataToBeginFail.into()
}
}
/// Read sz_buf to buf.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferReadChar(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool {
info!(LOG_LABEL, "enter StreamBufferReadChar");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.read_char_usize(buf, size)
} else {
false
}
}
/// Write sz_buf to buf.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn CircleStreamBufferWrite(object: *mut StreamBuffer, buf: *const c_char, size: usize) -> bool {
info!(LOG_LABEL, "enter CircleStreamBufferWrite");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.circle_write(buf, size)
} else {
false
}
}
/// read packets on client.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn ReadClientPackets(object: *mut StreamBuffer, stream_client: *const CSensorServiceClient,
callback_fun: ClientPacketCallBackFun) -> i32 {
info!(LOG_LABEL,"enter ReadClientPackets");
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.read_client_packets(stream_client, callback_fun);
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::ReadClientPacketsFail.into()
}
}
/// StreamBufferReadBuf.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferReadBuf(object: *const StreamBuffer) -> *const c_char {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.read_buf()
} else {
std::ptr::null()
}
}
/// obtain streambuffer's r_count field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetRcount(object: *const StreamBuffer) -> i32 {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.r_count() as i32
} else {
BufferStatusCode::RcountFail.into()
}
}
/// obtain streambuffer's w_count field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetWcount(object: *const StreamBuffer) -> i32 {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.w_count() as i32
} else {
BufferStatusCode::WcountFail.into()
}
}
/// obtain streambuffer's w_pos field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetWpos(object: *const StreamBuffer) -> i32 {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.w_pos() as i32
} else {
BufferStatusCode::WposFail.into()
}
}
/// obtain streambuffer's r_pos field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetRpos(object: *const StreamBuffer) -> i32 {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.r_pos() as i32
} else {
BufferStatusCode::RposFail.into()
}
}
/// obtain streambuffer's sz_buff field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferGetSzBuff(object: *const StreamBuffer) -> *const c_char {
if let Some(obj) = StreamBuffer::as_ref(object) {
obj.sz_buff()
} else {
std::ptr::null()
}
}
/// obtain streambuffer's rw_err_status field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferSetRwErrStatus(object: *mut StreamBuffer, rw_error_status: ErrorStatus) -> i32 {
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.set_rw_error_status(rw_error_status);
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::SetRwErrStatusFail.into()
}
}
/// set streambuffer's r_pos field.
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamBufferSetRpos(object: *mut StreamBuffer, r_pos: i32) -> i32 {
if let Some(obj) = StreamBuffer::as_mut(object) {
obj.set_r_pos(r_pos as usize);
BufferStatusCode::Ok.into()
} else {
BufferStatusCode::SetRposFail.into()
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::ffi::{CString, c_char};
use hilog_rust::{hilog, error, HiLogLabel, LogType};
use std::mem::size_of;
use crate::stream_buffer::StreamBuffer;
const STREAM_BUF_WRITE_FAIL: i32 = 2;
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "NetPacket"
};
#[repr(packed(1))]
#[repr(C)]
pub(crate) struct PackHead {
pub(crate) id_msg: MessageId,
pub(crate) size: usize,
}
#[derive(Copy, Clone)]
#[repr(C)]
pub(crate) enum MessageId {
Invalid = 0,
Device,
DeviceIds,
DeviceSupportKeys,
AddDeviceListener,
DeviceKeyboardType,
DisplayInfo,
NoticeAnr,
MarkProcess,
OnSubscribeKey,
OnKeyEvent,
OnPointerEvent,
ReportKeyEvent,
ReportPointerEvent,
OnDeviceAdded,
OnDeviceRemoved,
CoordinationAddListener,
CoordinationMessage,
CoordinationGetState,
DragNotifyResult,
DragStateListener,
}
#[derive(Copy, Clone)]
#[repr(C)]
pub(crate) struct NetPacket {
pub(crate) msg_id: MessageId,
pub(crate) stream_buffer: StreamBuffer,
}
#[derive(Copy, Clone)]
#[repr(C)]
pub struct CNetPacket {
pub(super) msg_id: MessageId,
pub(super) stream_buffer_ptr: *const StreamBuffer,
}
impl Default for NetPacket {
fn default() -> Self {
Self {
msg_id: MessageId::Invalid,
stream_buffer: Default::default(),
}
}
}
impl NetPacket {
fn as_ref<'a>(object: *const Self) -> Option<&'a Self> {
// SAFETY: as_ref has already done no-null verification inside
unsafe {
object.as_ref()
}
}
fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self> {
// SAFETY: as_mut has already done no-null verification inside
unsafe {
object.as_mut()
}
}
fn size(&self) -> usize {
self.stream_buffer.size()
}
pub(super) fn get_packet_length(&self) -> usize {
size_of::<PackHead>() + self.stream_buffer.w_pos
}
pub(crate) fn make_data(&self, buf: &mut StreamBuffer) {
let head = PackHead {
id_msg: self.msg_id,
size: self.stream_buffer.w_pos,
};
buf.write(head);
if self.stream_buffer.w_pos > 0 && !buf.write_char_usize(&self.stream_buffer.sz_buff[0] as *const c_char,
self.stream_buffer.w_pos) {
error!(LOG_LABEL, "Write data to stream failed, errCode:{}", STREAM_BUF_WRITE_FAIL);
}
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/// provide C interface to C++ for calling
pub mod ffi;
use hilog_rust::{debug, error, hilog, HiLogLabel, LogType};
use libc::c_int;
use std::{ffi::{CString, c_char}, thread::sleep, time::Duration};
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "StreamSession"
};
const MAX_PACKET_BUF_SIZE: usize = 256;
const SEND_RETRY_LIMIT: i32 = 32;
const SEND_RETRY_SLEEP_TIME: u64 = 10000;
const RET_ERR: i32 = -1;
#[repr(C)]
pub struct StreamSession {
module_type: i32,
fd: i32,
uid : i32,
pid: i32,
token_type: i32,
}
impl Default for StreamSession {
fn default() -> Self {
Self {
module_type: RET_ERR,
fd: RET_ERR,
uid: RET_ERR,
pid: RET_ERR,
token_type: RET_ERR,
}
}
}
impl StreamSession {
fn as_ref<'a>(object: *const Self) -> Option<&'a Self> {
// SAFETY: as_ref has already done no-null verification inside
unsafe {
object.as_ref()
}
}
fn as_mut<'a>(object: *mut Self) -> Option<&'a mut Self> {
// SAFETY: as_mut has already done no-null verification inside
unsafe {
object.as_mut()
}
}
fn uid(&self) -> i32 {
self.uid
}
fn pid(&self) -> i32 {
self.pid
}
fn module_type(&self) -> i32 {
self.module_type
}
fn session_fd(&self) -> i32 {
self.fd
}
fn set_token_type(&mut self, style: i32) {
self.token_type = style
}
fn set_uid(&mut self, uid: i32) {
self.uid = uid
}
fn set_pid(&mut self, pid: i32) {
self.pid = pid
}
fn set_fd(&mut self, fd: i32) {
self.fd = fd
}
fn token_type(&self) -> i32 {
self.token_type
}
fn session_close(&mut self) {
debug!(LOG_LABEL, "Enter fd_:{}.", self.fd);
if self.fd >= 0 {
// SAFETY: call unsafe function
unsafe {
libc::close(self.fd as c_int);
}
self.fd = RET_ERR;
}
}
fn session_send_msg(&self, buf: *const c_char, size: usize) -> bool {
if buf.is_null() {
error!(LOG_LABEL, "buf is null");
return false;
}
if size == 0 || size > MAX_PACKET_BUF_SIZE {
error!(LOG_LABEL, "size is either equal to 0 or greater than MAX_PACKET_BUF_SIZE, size: {}", size);
return false;
}
if self.fd < 0 {
error!(LOG_LABEL, "The fd is less than 0, fd: {}", self.fd);
return false;
}
let mut idx: usize = 0;
let mut retry_count: i32 = 0;
let buf_size = size;
let mut rem_size = buf_size;
while rem_size > 0 && retry_count < SEND_RETRY_LIMIT {
retry_count += 1;
// SAFETY: call extern libc library function
let count = unsafe {
libc::send(self.fd as c_int, buf.add(idx) as *const libc::c_void, rem_size,
libc::MSG_DONTWAIT | libc::MSG_NOSIGNAL)
};
// SAFETY: call extern libc library function
let errno = unsafe {
*libc::__errno_location()
};
if count < 0 {
if errno == libc::EAGAIN || errno == libc::EINTR || errno == libc::EWOULDBLOCK {
sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
error!(LOG_LABEL, "Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:{}", errno);
continue;
}
error!(LOG_LABEL, "Send return failed,error:{} fd:{}", errno, self.fd);
return false;
}
idx += count as usize;
rem_size -= count as usize;
if rem_size > 0 {
sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
}
}
if retry_count >= SEND_RETRY_LIMIT || rem_size != 0 {
error!(LOG_LABEL, "Send too many times:{}/{},size:{}/{} fd:{}",
retry_count, SEND_RETRY_LIMIT, idx, buf_size, self.fd);
return false;
}
true
}
}

View File

@ -0,0 +1,224 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use crate::error::SessionStatusCode;
use hilog_rust::{info, hilog, HiLogLabel, LogType};
const LOG_LABEL: HiLogLabel = HiLogLabel {
log_type: LogType::LogCore,
domain: 0xD002220,
tag: "stream_session_ffi"
};
/// Create unique_ptr of StreamSession for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionCreate() -> *mut StreamSession {
let stream_session: Box::<StreamSession> = Box::default();
Box::into_raw(stream_session)
}
/// Drop unique_ptr of StreamSession for C++ code
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// If uninitialized memory requires special handling, please refer to std::mem::MaybeUninit.
/// The pointer needs to be aligned for access. If the memory pointed to by the pointer is a compact
/// memory layout and requires special consideration. Please refer to (#[repr(packed)]).
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionDelete(raw: *mut StreamSession) {
if !raw.is_null() {
drop(Box::from_raw(raw));
}
}
/// Set StreamSession's uid
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionSetUid(object: *mut StreamSession, uid: i32) -> i32 {
info!(LOG_LABEL, "enter StreamSessionSetUid");
if let Some(obj) = StreamSession::as_mut(object) {
obj.set_uid(uid);
SessionStatusCode::Ok.into()
} else {
SessionStatusCode::SetUidFail.into()
}
}
/// Set StreamSession's fd
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionSetFd(object: *mut StreamSession, fd: i32) -> i32 {
info!(LOG_LABEL, "enter StreamSessionSetFd");
if let Some(obj) = StreamSession::as_mut(object) {
obj.set_fd(fd);
SessionStatusCode::Ok.into()
} else {
SessionStatusCode::SetFdFail.into()
}
}
/// Set StreamSession's pid
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionSetPid(object: *mut StreamSession, pid: i32) -> i32 {
info!(LOG_LABEL, "enter StreamSessionSetPid");
if let Some(obj) = StreamSession::as_mut(object) {
obj.set_pid(pid);
SessionStatusCode::Ok.into()
} else {
SessionStatusCode::SetPidFail.into()
}
}
/// Obtain StreamSession's uid
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionGetUid(object: *const StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionGetUid");
if let Some(obj) = StreamSession::as_ref(object) {
obj.uid()
} else {
SessionStatusCode::UidFail.into()
}
}
/// Obtain StreamSession's pid
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionGetPid(object: *const StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionGetPid");
if let Some(obj) = StreamSession::as_ref(object) {
obj.pid()
} else {
SessionStatusCode::PidFail.into()
}
}
/// Obtain StreamSession's fd
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionGetFd(object: *const StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionGetFd");
if let Some(obj) = StreamSession::as_ref(object) {
obj.session_fd()
} else {
SessionStatusCode::FdFail.into()
}
}
/// Set StreamSession's tokentype
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionSetTokenType(object: *mut StreamSession, style: i32) -> i32 {
info!(LOG_LABEL, "enter StreamSessionSetTokenType");
if let Some(obj) = StreamSession::as_mut(object) {
obj.set_token_type(style);
SessionStatusCode::Ok.into()
} else {
SessionStatusCode::SetTokenTypeFail.into()
}
}
/// Obtain StreamSession's tokentype
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionGetTokenType(object: *const StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionGetTokenType");
if let Some(obj) = StreamSession::as_ref(object) {
obj.token_type()
} else {
SessionStatusCode::TokenTypeFail.into()
}
}
/// Obtain StreamSession's moduletype
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionGetModuleType(object: *const StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionGetModuleType");
if let Some(obj) = StreamSession::as_ref(object) {
obj.module_type()
} else {
SessionStatusCode::ModuleTypeFail.into()
}
}
/// Close StreamSession's fd
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionClose(object: *mut StreamSession) -> i32 {
info!(LOG_LABEL, "enter StreamSessionClose");
if let Some(obj) = StreamSession::as_mut(object) {
obj.session_close();
SessionStatusCode::Ok.into()
} else {
SessionStatusCode::CloseFail.into()
}
}
/// Send message via StreamSessions
///
/// # Safety
///
/// The pointer which pointed the memory already initialized must be valid.
/// Makesure the memory shouldn't be dropped while whose pointer is being used.
#[no_mangle]
pub unsafe extern "C" fn StreamSessionSendMsg(object: *const StreamSession, buf: *const c_char, size: usize) -> bool {
info!(LOG_LABEL, "enter StreamSessionSendMsg");
if let Some(obj) = StreamSession::as_ref(object) {
obj.session_send_msg(buf, size)
} else {
false
}
}

View File

@ -13,4 +13,14 @@
import("//build/ohos.gni")
declare_args() {
rust_socket_ipc = false
}
SUBSYSTEM_DIR = "//base/sensors/sensor"
sensor_default_defines = []
if (rust_socket_ipc) {
sensor_default_defines += [ "OHOS_BUILD_ENABLE_RUST" ]
}

View File

@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import("//base/sensors/sensor/sensor.gni")
import("//build/ohos.gni")
import("./../../sensor.gni")
@ -44,11 +45,17 @@ ohos_shared_library("libsensor_service") {
"$SUBSYSTEM_DIR/utils/ipc/include",
]
defines = sensor_default_defines
deps = [
"$SUBSYSTEM_DIR/utils/common:libsensor_utils",
"$SUBSYSTEM_DIR/utils/ipc:libsensor_ipc",
]
if (rust_socket_ipc) {
deps += [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ]
}
external_deps = [
"access_token:libaccesstoken_sdk",
"c_utils:utils",
@ -95,11 +102,17 @@ ohos_shared_library("libsensor_service_static") {
"$SUBSYSTEM_DIR/utils/ipc/include",
]
defines = sensor_default_defines
deps = [
"$SUBSYSTEM_DIR/utils/common:libsensor_utils",
"$SUBSYSTEM_DIR/utils/ipc:libsensor_ipc",
]
if (rust_socket_ipc) {
deps += [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ]
}
external_deps = [
"access_token:libaccesstoken_sdk",
"c_utils:utils",

View File

@ -17,6 +17,9 @@
#include "sensor.h"
#include "sensor_agent_type.h"
#ifdef OHOS_BUILD_ENABLE_RUST
#include "rust_binding.h"
#endif // OHOS_BUILD_ENABLE_RUST
namespace OHOS {
namespace Sensors {
@ -227,7 +230,11 @@ void SensorPowerPolicy::ReportActiveInfo(const ActiveInfo &activeInfo,
NetPacket pkt(MessageId::ACTIVE_INFO);
pkt << activeInfo.GetPid() << activeInfo.GetSensorId() <<
activeInfo.GetSamplingPeriodNs() << activeInfo.GetMaxReportDelayNs();
#ifdef OHOS_BUILD_ENABLE_RUST
if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) {
#else
if (pkt.ChkRWError()) {
#endif // OHOS_BUILD_ENABLE_RUST
SEN_HILOGE("Packet write data failed");
return;
}

View File

@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import("//base/sensors/sensor/sensor.gni")
import("//build/ohos.gni")
import("./../../sensor.gni")
@ -28,6 +29,12 @@ ohos_shared_library("libsensor_ipc") {
"$SUBSYSTEM_DIR/utils/ipc/include",
]
defines = sensor_default_defines
if (rust_socket_ipc) {
deps = [ "$SUBSYSTEM_DIR/rust/utils/socket_ipc_rust_ffi:sensor_rust_util_ffi" ]
}
external_deps = [
"access_token:libaccesstoken_sdk",
"c_utils:utils",

View File

@ -0,0 +1,71 @@
/*
* Copyright (C) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef RUST_BINDING_H
#define RUST_BINDING_H
#include <stdint.h>
#include <string.h>
#include "proto.h"
extern "C" {
struct RustStreamSocket;
struct RustStreamSession;
struct RustStreamBuffer;
struct RustNetPacket {
OHOS::Sensors::MessageId msgId { OHOS::Sensors::MessageId::INVALID };
struct RustStreamBuffer* streamBuffer;
};
RustStreamSocket* StreamSocketCreate(void);
void StreamSocketDelete(RustStreamSocket* raw);
int32_t StreamSocketGetFd(const RustStreamSocket* rustStreamSocket);
int32_t StreamSocketClose(RustStreamSocket* rustStreamSocket);
int32_t StreamSocketSetFd(RustStreamSocket* rustStreamSocket, int32_t fd);
RustStreamSession* StreamSessionCreate(void);
void StreamSessionDelete(RustStreamSession* raw);
void StreamSessionSetUid(RustStreamSession* rustStreamSession, int32_t uid);
void StreamSessionSetPid(RustStreamSession* rustStreamSession, int32_t pid);
void StreamSessionSetFd(RustStreamSession* rustStreamSession, int32_t fd);
void StreamSessionClose(RustStreamSession* rustStreamSession);
bool StreamSessionSendMsg(const RustStreamSession* rustStreamSession, const char* buf, size_t size);
int32_t StreamSessionGetUid(const RustStreamSession* rustStreamSession);
int32_t StreamSessionGetPid(const RustStreamSession* rustStreamSession);
int32_t StreamSessionGetFd(const RustStreamSession* rustStreamSession);
void StreamSessionSetTokenType(RustStreamSession* rustStreamSession, int32_t type);
int32_t StreamSessionGetTokenType(const RustStreamSession* rustStreamSession);
int32_t StreamSessionGetModuleType(const RustStreamSession* rustStreamSession);
RustStreamBuffer* StreamBufferCreate(void);
void StreamBufferDelete(RustStreamBuffer* raw);
int32_t StreamBufferGetWcount(const RustStreamBuffer* rustStreamBuffer);
int32_t StreamBufferGetRcount(const RustStreamBuffer* rustStreamBuffer);
int32_t StreamBufferGetWpos(const RustStreamBuffer* rustStreamBuffer);
int32_t StreamBufferGetRpos(const RustStreamBuffer* rustStreamBuffer);
const char* StreamBufferGetSzBuff(const RustStreamBuffer* rustStreamBuffer);
int32_t StreamBufferSetRwErrStatus(RustStreamBuffer* rustStreamBuffer, int32_t rwErrStatus);
int32_t StreamBufferSetRpos(RustStreamBuffer* rustStreamBuffer, int32_t rPos);
void StreamBufferReset(RustStreamBuffer* rustStreamBuffer);
void StreamBufferClean(RustStreamBuffer* rustStreamBuffer);
bool StreamBufferRead(RustStreamBuffer* rustStreamBuffer1, RustStreamBuffer* rustStreamBuffer2);
bool StreamBufferWrite(RustStreamBuffer* rustStreamBuffer1, const RustStreamBuffer* rustStreamBuffer2);
const char* StreamBufferReadBuf(const RustStreamBuffer* rustStreamBuffer);
bool StreamBufferReadChar(RustStreamBuffer* rustStreamBuffer, char* buf, size_t size);
bool StreamBufferWriteChar(RustStreamBuffer* rustStreamBuffer, const char* buf, size_t size);
const char* StreamBufferData(const RustStreamBuffer* rustStreamBuffer);
size_t StreamBufferSize(const RustStreamBuffer* rustStreamBuffer);
const char* StreamBufferGetErrorStatusRemark(const RustStreamBuffer* rustStreamBuffer);
bool StreamBufferChkRWError(const RustStreamBuffer* rustStreamBuffer);
bool StreamBufferCheckWrite(RustStreamBuffer* rustStreamBuffer, size_t size);
bool CircleStreamBufferWrite(RustStreamBuffer* rustStreamBuffer, const char* buf, size_t size);
void CircleStreamBufferCopyDataToBegin(RustStreamBuffer* rustStreamBuffer);
}
#endif // RUST_BINDING_H

View File

@ -25,6 +25,10 @@
#include "proto.h"
#include "sensors_errors.h"
#ifdef OHOS_BUILD_ENABLE_RUST
#include "rust_binding.h"
#endif // OHOS_BUILD_ENABLE_RUST
namespace OHOS {
namespace Sensors {
class StreamBuffer {
@ -36,21 +40,23 @@ public:
virtual ~StreamBuffer() = default;
void Reset();
void Clean();
bool SeekReadPos(size_t n);
bool Read(std::string &buf);
bool Read(StreamBuffer &buf);
bool Read(char *buf, size_t size);
bool Write(const std::string &buf);
bool Write(const StreamBuffer &buf);
virtual bool Write(const char *buf, size_t size);
bool IsEmpty() const;
const std::string& GetErrorStatusRemark() const;
bool ChkRWError() const;
#ifndef OHOS_BUILD_ENABLE_RUST
bool SeekReadPos(size_t n);
bool IsEmpty() const;
size_t Size() const;
size_t UnreadSize() const;
size_t GetAvailableBufSize() const;
const std::string& GetErrorStatusRemark() const;
const char* Data() const;
const char *WriteBuf() const;
#endif // OHOS_BUILD_ENABLE_RUST
template<typename T>
bool Read(T &data);
template<typename T>
@ -60,7 +66,7 @@ public:
template<typename T>
bool Write(const std::vector<T> &data);
const char *ReadBuf() const;
const char *WriteBuf() const;
template<typename T>
StreamBuffer &operator >> (T &data);
template<typename T>
@ -69,6 +75,11 @@ public:
protected:
bool Clone(const StreamBuffer &buf);
#ifdef OHOS_BUILD_ENABLE_RUST
public:
std::unique_ptr<RustStreamBuffer, void(*)(RustStreamBuffer*)> streamBufferPtr_
{ StreamBufferCreate(), StreamBufferDelete };
#else
enum class ErrorStatus {
ERROR_STATUS_OK,
ERROR_STATUS_READ,
@ -80,14 +91,22 @@ protected:
size_t rPos_ { 0 };
size_t wPos_ { 0 };
char szBuff_[MAX_STREAM_BUF_SIZE + 1] = {};
#endif // OHOS_BUILD_ENABLE_RUST
};
template<typename T>
bool StreamBuffer::Read(T &data)
{
if (!Read(reinterpret_cast<char *>(&data), sizeof(data))) {
#ifdef OHOS_BUILD_ENABLE_RUST
const char* s = StreamBufferGetErrorStatusRemark(streamBufferPtr_.get());
SEN_HILOGE("[%{public}s] size:%{public}zu count:%{public}d",
s, sizeof(data), StreamBufferGetRcount(streamBufferPtr_.get()) + 1);
#else
SEN_HILOGE("%{public}s, size:%{public}zu, count:%{public}zu",
GetErrorStatusRemark().c_str(), sizeof(data), rCount_ + 1);
#endif // OHOS_BUILD_ENABLE_RUST
return false;
}
return true;
@ -97,8 +116,14 @@ template<typename T>
bool StreamBuffer::Write(const T &data)
{
if (!Write(reinterpret_cast<const char *>(&data), sizeof(data))) {
#ifdef OHOS_BUILD_ENABLE_RUST
const char* s = StreamBufferGetErrorStatusRemark(streamBufferPtr_.get());
SEN_HILOGE("[%{public}s] size:%{public}zu,count:%{public}d",
s, sizeof(data), StreamBufferGetWcount(streamBufferPtr_.get()) + 1);
#else
SEN_HILOGE("%{public}s, size:%{public}zu, count:%{public}zu",
GetErrorStatusRemark().c_str(), sizeof(data), wCount_ + 1);
#endif // OHOS_BUILD_ENABLE_RUST
return false;
}
return true;

View File

@ -23,6 +23,10 @@
#include "net_packet.h"
#include "proto.h"
#ifdef OHOS_BUILD_ENABLE_RUST
#include "rust_binding.h"
#endif // OHOS_BUILD_ENABLE_RUST
namespace OHOS {
namespace Sensors {
@ -56,10 +60,15 @@ protected:
std::map<int32_t, std::vector<EventTime>> events_;
std::string descript_;
const std::string programName_;
#ifdef OHOS_BUILD_ENABLE_RUST
std::unique_ptr<RustStreamSession, void(*)(RustStreamSession*)> streamSessionPtr_
{ StreamSessionCreate(), StreamSessionDelete };
#else
int32_t fd_ { -1 };
const int32_t uid_ { -1 };
const int32_t pid_ { -1 };
int32_t tokenType_ { ATokenTypeEnum::TOKEN_INVALID };
#endif // OHOS_BUILD_ENABLE_RUST
};
} // namespace Sensors
} // namespace OHOS

View File

@ -25,23 +25,32 @@
#include "circle_stream_buffer.h"
#include "net_packet.h"
#ifdef OHOS_BUILD_ENABLE_RUST
#include "rust_binding.h"
#endif // OHOS_BUILD_ENABLE_RUST
namespace OHOS {
namespace Sensors {
class StreamSocket {
public:
using PacketCallBackFun = std::function<void(NetPacket&)>;
using PacketCallBackFun = std::function<void(NetPacket &)>;
StreamSocket();
virtual ~StreamSocket();
void OnReadPackets(CircleStreamBuffer &buf, PacketCallBackFun callbackFun);
void Close();
int32_t GetFd() const;
#ifndef OHOS_BUILD_ENABLE_RUST
void OnReadPackets(CircleStreamBuffer &buf, PacketCallBackFun callbackFun);
#endif // OHOS_BUILD_ENABLE_RUST
DISALLOW_COPY_AND_MOVE(StreamSocket);
protected:
#ifdef OHOS_BUILD_ENABLE_RUST
std::unique_ptr<struct RustStreamSocket, void(*)(RustStreamSocket*)> streamSocketPtr_
{ StreamSocketCreate(), StreamSocketDelete };
#else
int32_t fd_ { -1 };
int32_t epollFd_ { -1 };
#endif // OHOS_BUILD_ENABLE_RUST
};
} // namespace Sensors
} // namespace OHOS
#endif // STREAM_SOCKET_H
#endif // STREAM_SOCKET_H

View File

@ -1,57 +1,69 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "circle_stream_buffer.h"
#include "sensors_errors.h"
namespace OHOS {
namespace Sensors {
bool CircleStreamBuffer::CheckWrite(size_t size)
{
size_t availSize = GetAvailableBufSize();
if (size > availSize && rPos_ > 0) {
CopyDataToBegin();
availSize = GetAvailableBufSize();
}
return (availSize >= size);
}
bool CircleStreamBuffer::Write(const char *buf, size_t size)
{
if (!CheckWrite(size)) {
SEN_HILOGE("Buffer is overflow, availableSize:%{public}zu, size:%{public}zu,"
"unreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu",
GetAvailableBufSize(), size, UnreadSize(), rPos_, wPos_);
return false;
}
return StreamBuffer::Write(buf, size);
}
void CircleStreamBuffer::CopyDataToBegin()
{
size_t unreadSize = UnreadSize();
if (unreadSize > 0 && rPos_ > 0) {
size_t pos = 0;
for (size_t i = rPos_; i <= wPos_;) {
szBuff_[pos++] = szBuff_[i++];
}
}
SEN_HILOGD("UnreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", unreadSize, rPos_, wPos_);
rPos_ = 0;
wPos_ = unreadSize;
}
} // namespace Sensors
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "circle_stream_buffer.h"
#include "sensors_errors.h"
namespace OHOS {
namespace Sensors {
bool CircleStreamBuffer::CheckWrite(size_t size)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferCheckWrite(streamBufferPtr_.get(), size);
#else
size_t availSize = GetAvailableBufSize();
if (size > availSize && rPos_ > 0) {
CopyDataToBegin();
availSize = GetAvailableBufSize();
}
return (availSize >= size);
#endif // OHOS_BUILD_ENABLE_RUST
}
bool CircleStreamBuffer::Write(const char *buf, size_t size)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return CircleStreamBufferWrite(streamBufferPtr_.get(), buf, size);
#else
if (!CheckWrite(size)) {
SEN_HILOGE("Buffer is overflow, availableSize:%{public}zu, size:%{public}zu,"
"unreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu",
GetAvailableBufSize(), size, UnreadSize(), rPos_, wPos_);
return false;
}
return StreamBuffer::Write(buf, size);
#endif // OHOS_BUILD_ENABLE_RUST
}
void CircleStreamBuffer::CopyDataToBegin()
{
#ifdef OHOS_BUILD_ENABLE_RUST
CircleStreamBufferCopyDataToBegin(streamBufferPtr_.get());
#else
size_t unreadSize = UnreadSize();
if (unreadSize > 0 && rPos_ > 0) {
size_t pos = 0;
for (size_t i = rPos_; i <= wPos_;) {
szBuff_[pos++] = szBuff_[i++];
}
}
SEN_HILOGD("UnreadSize:%{public}zu, rPos:%{public}zu, wPos:%{public}zu", unreadSize, rPos_, wPos_);
rPos_ = 0;
wPos_ = unreadSize;
#endif // OHOS_BUILD_ENABLE_RUST
}
} // namespace Sensors
} // namespace OHOS

View File

@ -1,62 +1,85 @@
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "net_packet.h"
#include "sensors_errors.h"
namespace OHOS {
namespace Sensors {
NetPacket::NetPacket(MessageId msgId) : msgId_(msgId)
{}
NetPacket::NetPacket(const NetPacket &pkt) : NetPacket(pkt.GetMsgId())
{
Clone(pkt);
}
void NetPacket::MakeData(StreamBuffer &buf) const
{
PACKHEAD head = {msgId_, wPos_};
buf << head;
if (wPos_ > 0) {
if (!buf.Write(&szBuff_[0], wPos_)) {
SEN_HILOGE("Write data to stream failed");
return;
}
}
}
size_t NetPacket::GetSize() const
{
return Size();
}
size_t NetPacket::GetPacketLength() const
{
return sizeof(PackHead) + wPos_;
}
const char* NetPacket::GetData() const
{
return Data();
}
MessageId NetPacket::GetMsgId() const
{
return msgId_;
}
} // namespace Sensors
} // namespace OHOS
/*
* Copyright (c) 2023 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "net_packet.h"
#include "sensors_errors.h"
namespace OHOS {
namespace Sensors {
NetPacket::NetPacket(MessageId msgId) : msgId_(msgId)
{}
NetPacket::NetPacket(const NetPacket &pkt) : NetPacket(pkt.GetMsgId())
{
Clone(pkt);
}
void NetPacket::MakeData(StreamBuffer &buf) const
{
#ifdef OHOS_BUILD_ENABLE_RUST
PACKHEAD head = {msgId_, StreamBufferGetWpos(streamBufferPtr_.get())};
buf << head;
if (StreamBufferGetWpos(streamBufferPtr_.get()) > 0) {
if (!buf.Write(StreamBufferGetSzBuff(streamBufferPtr_.get()), StreamBufferGetWpos(streamBufferPtr_.get()))) {
SEN_HILOGE("Write data to stream failed");
return;
}
}
#else
PACKHEAD head = {msgId_, wPos_};
buf << head;
if (wPos_ > 0) {
if (!buf.Write(&szBuff_[0], wPos_)) {
SEN_HILOGE("Write data to stream failed");
return;
}
}
#endif // OHOS_BUILD_ENABLE_RUST
}
size_t NetPacket::GetSize() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferSize(streamBufferPtr_.get());
#else
return Size();
#endif // OHOS_BUILD_ENABLE_RUST
}
size_t NetPacket::GetPacketLength() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return (static_cast<int32_t>(sizeof(PackHead)) + StreamBufferGetWpos(streamBufferPtr_.get()));
#else
return sizeof(PackHead) + wPos_;
#endif // OHOS_BUILD_ENABLE_RUST
}
const char* NetPacket::GetData() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferData(streamBufferPtr_.get());
#else
return Data();
#endif // OHOS_BUILD_ENABLE_RUST
}
MessageId NetPacket::GetMsgId() const
{
return msgId_;
}
} // namespace Sensors
} // namespace OHOS

View File

@ -30,36 +30,45 @@ StreamBuffer &StreamBuffer::operator=(const StreamBuffer &other)
void StreamBuffer::Reset()
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamBufferReset(streamBufferPtr_.get());
#else
rPos_ = 0;
wPos_ = 0;
rCount_ = 0;
wCount_ = 0;
rwErrorStatus_ = ErrorStatus::ERROR_STATUS_OK;
#endif // OHOS_BUILD_ENABLE_RUST
}
void StreamBuffer::Clean()
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamBufferClean(streamBufferPtr_.get());
#else
Reset();
errno_t ret = memset_sp(&szBuff_, sizeof(szBuff_), 0, sizeof(szBuff_));
if (ret != EOK) {
SEN_HILOGE("Call memset_s fail");
return;
}
}
bool StreamBuffer::SeekReadPos(size_t n)
{
size_t pos = rPos_ + n;
if (pos > wPos_) {
SEN_HILOGE("The position in the calculation is not as expected. pos:%{public}zu, [0, %{public}zu]", pos, wPos_);
return false;
}
rPos_ = pos;
return true;
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Read(std::string &buf)
{
#ifdef OHOS_BUILD_ENABLE_RUST
const int32_t ERROR_STATUS_READ = 1;
if (StreamBufferGetRpos(streamBufferPtr_.get()) == StreamBufferGetWpos(streamBufferPtr_.get())) {
SEN_HILOGE("Not enough memory to read, errCode:%{public}d", STREAM_BUF_READ_FAIL);
StreamBufferSetRwErrStatus(streamBufferPtr_.get(), ERROR_STATUS_READ);
return false;
}
buf = ReadBuf();
StreamBufferSetRpos(streamBufferPtr_.get(),
StreamBufferGetRpos(streamBufferPtr_.get()) + static_cast<int32_t>(buf.length()) + 1);
return (buf.length() > 0);
#else
if (rPos_ == wPos_) {
SEN_HILOGE("Not enough memory to read");
rwErrorStatus_ = ErrorStatus::ERROR_STATUS_READ;
@ -68,25 +77,37 @@ bool StreamBuffer::Read(std::string &buf)
buf = ReadBuf();
rPos_ += buf.length() + 1;
return (buf.length() > 0);
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Write(const std::string &buf)
{
return Write(buf.c_str(), buf.length() + 1);
return Write(buf.c_str(), buf.length()+1);
}
bool StreamBuffer::Read(StreamBuffer &buf)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferRead(streamBufferPtr_.get(), buf.streamBufferPtr_.get());
#else
return buf.Write(Data(), Size());
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Write(const StreamBuffer &buf)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferWrite(streamBufferPtr_.get(), buf.streamBufferPtr_.get());
#else
return Write(buf.Data(), buf.Size());
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Read(char *buf, size_t size)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferReadChar(streamBufferPtr_.get(), buf, size);
#else
if (ChkRWError()) {
return false;
}
@ -114,10 +135,15 @@ bool StreamBuffer::Read(char *buf, size_t size)
rPos_ += size;
++rCount_;
return true;
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Write(const char *buf, size_t size)
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferWriteChar(streamBufferPtr_.get(), buf, size);
#else
if (ChkRWError()) {
return false;
}
@ -146,6 +172,43 @@ bool StreamBuffer::Write(const char *buf, size_t size)
wPos_ += size;
++wCount_;
return true;
#endif // OHOS_BUILD_ENABLE_RUST
}
const char *StreamBuffer::ReadBuf() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferReadBuf(streamBufferPtr_.get());
#else
return &szBuff_[rPos_];
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamBuffer::Clone(const StreamBuffer &buf)
{
Clean();
#ifdef OHOS_BUILD_ENABLE_RUST
return Write(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get()));
#else
return Write(buf.Data(), buf.Size());
#endif // OHOS_BUILD_ENABLE_RUST
}
#ifndef OHOS_BUILD_ENABLE_RUST
bool StreamBuffer::ChkRWError() const
{
return (rwErrorStatus_ != ErrorStatus::ERROR_STATUS_OK);
}
bool StreamBuffer::SeekReadPos(size_t n)
{
size_t pos = rPos_ + n;
if (pos > wPos_) {
SEN_HILOGE("The position in the calculation is not as expected. pos:%{public}zu, [0, %{public}zu]", pos, wPos_);
return false;
}
rPos_ = pos;
return true;
}
bool StreamBuffer::IsEmpty() const
@ -155,7 +218,11 @@ bool StreamBuffer::IsEmpty() const
size_t StreamBuffer::Size() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferSize(&rustStreamBuffer_);
#else
return wPos_;
#endif // OHOS_BUILD_ENABLE_RUST
}
size_t StreamBuffer::UnreadSize() const
@ -168,13 +235,11 @@ size_t StreamBuffer::GetAvailableBufSize() const
return ((wPos_ >= MAX_STREAM_BUF_SIZE) ? 0 : (MAX_STREAM_BUF_SIZE - wPos_));
}
bool StreamBuffer::ChkRWError() const
{
return (rwErrorStatus_ != ErrorStatus::ERROR_STATUS_OK);
}
const std::string &StreamBuffer::GetErrorStatusRemark() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferGetErrorStatusRemark(streamBufferPtr_.get());
#else
static const std::vector<std::pair<ErrorStatus, std::string>> remark {
{ErrorStatus::ERROR_STATUS_OK, "OK"},
{ErrorStatus::ERROR_STATUS_READ, "READ_ERROR"},
@ -185,27 +250,22 @@ const std::string &StreamBuffer::GetErrorStatusRemark() const
return (item.first == rwErrorStatus_);
});
return (tIter != remark.cend() ? tIter->second : invalidStatus);
#endif // OHOS_BUILD_ENABLE_RUST
}
const char *StreamBuffer::Data() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamBufferData(&rustStreamBuffer_);
#else
return &szBuff_[0];
}
const char *StreamBuffer::ReadBuf() const
{
return &szBuff_[rPos_];
#endif // OHOS_BUILD_ENABLE_RUST
}
const char *StreamBuffer::WriteBuf() const
{
return &szBuff_[wPos_];
}
bool StreamBuffer::Clone(const StreamBuffer &buf)
{
Clean();
return Write(buf.Data(), buf.Size());
}
#endif // OHOS_BUILD_ENABLE_RUST
} // namespace Sensors
} // namespace OHOS

View File

@ -29,16 +29,30 @@ constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "St
}
StreamSession::StreamSession(const std::string &programName, const int32_t fd, const int32_t uid, const int32_t pid)
: programName_(programName),
: programName_(programName)
#ifdef OHOS_BUILD_ENABLE_RUST
{
StreamSessionSetFd(streamSessionPtr_.get(), fd);
StreamSessionSetUid(streamSessionPtr_.get(), uid);
StreamSessionSetPid(streamSessionPtr_.get(), pid);
UpdateDescript();
}
#else
,
fd_(fd),
uid_(uid),
pid_(pid)
{
UpdateDescript();
}
#endif // OHOS_BUILD_ENABLE_RUST
bool StreamSession::SendMsg(const char *buf, size_t size) const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSessionSendMsg(streamSessionPtr_.get(), buf, size);
#else
CHKPF(buf);
if ((size == 0) || (size > MAX_PACKET_BUF_SIZE)) {
SEN_HILOGE("buf size:%{public}zu", size);
@ -56,7 +70,11 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const
auto count = send(fd_, &buf[idx], remSize, MSG_DONTWAIT | MSG_NOSIGNAL);
if (count < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) {
#ifdef OHOS_BUILD_ENABLE_RUST
sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
#else
usleep(SEND_RETRY_SLEEP_TIME);
#endif
SEN_HILOGW("Continue for errno EAGAIN|EINTR|EWOULDBLOCK, errno:%{public}d", errno);
continue;
}
@ -66,7 +84,11 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const
idx += static_cast<size_t>(count);
remSize -= static_cast<size_t>(count);
if (remSize > 0) {
#ifdef OHOS_BUILD_ENABLE_RUST
sleep(Duration::from_micros(SEND_RETRY_SLEEP_TIME));
#else
usleep(SEND_RETRY_SLEEP_TIME);
#endif
}
}
if (retryCount >= SEND_RETRY_LIMIT || remSize != 0) {
@ -75,19 +97,37 @@ bool StreamSession::SendMsg(const char *buf, size_t size) const
return false;
}
return true;
#endif // OHOS_BUILD_ENABLE_RUST
}
void StreamSession::Close()
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamSessionClose(streamSessionPtr_.get());
UpdateDescript();
#else
if (fd_ >= 0) {
close(fd_);
fd_ = -1;
UpdateDescript();
}
#endif // OHOS_BUILD_ENABLE_RUST
}
void StreamSession::UpdateDescript()
{
#ifdef OHOS_BUILD_ENABLE_RUST
std::ostringstream oss;
oss << "fd = " << StreamSessionGetFd(streamSessionPtr_.get())
<< ", programName = " << programName_
<< ", moduleType = " << StreamSessionGetModuleType(streamSessionPtr_.get())
<< ((StreamSessionGetFd(streamSessionPtr_.get()) < 0) ? ", closed" : ", opened")
<< ", uid = " << StreamSessionGetUid(streamSessionPtr_.get())
<< ", pid = " << StreamSessionGetPid(streamSessionPtr_.get())
<< ", tokenType = " << StreamSessionGetTokenType(streamSessionPtr_.get())
<< std::endl;
descript_ = oss.str().c_str();
#else
std::ostringstream oss;
oss << "fd = " << fd_
<< ", programName = " << programName_
@ -97,10 +137,20 @@ void StreamSession::UpdateDescript()
<< ", tokenType = " << tokenType_
<< std::endl;
descript_ = oss.str().c_str();
#endif // OHOS_BUILD_ENABLE_RUST
}
bool StreamSession::SendMsg(const NetPacket &pkt) const
{
#ifdef OHOS_BUILD_ENABLE_RUST
if (StreamBufferChkRWError(pkt.streamBufferPtr_.get())) {
SEN_HILOGE("Read and write status is error");
return false;
}
StreamBuffer buf;
pkt.MakeData(buf);
return SendMsg(StreamBufferData(buf.streamBufferPtr_.get()), StreamBufferSize(buf.streamBufferPtr_.get()));
#else
if (pkt.ChkRWError()) {
SEN_HILOGE("Read and write status failed");
return false;
@ -108,16 +158,25 @@ bool StreamSession::SendMsg(const NetPacket &pkt) const
StreamBuffer buf;
pkt.MakeData(buf);
return SendMsg(buf.Data(), buf.Size());
#endif // OHOS_BUILD_ENABLE_RUST
}
int32_t StreamSession::GetUid() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSessionGetUid(streamSessionPtr_.get());
#else
return uid_;
#endif // OHOS_BUILD_ENABLE_RUST
}
int32_t StreamSession::GetPid() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSessionGetPid(streamSessionPtr_.get());
#else
return pid_;
#endif // OHOS_BUILD_ENABLE_RUST
}
SessionPtr StreamSession::GetSharedPtr()
@ -127,7 +186,11 @@ SessionPtr StreamSession::GetSharedPtr()
int32_t StreamSession::GetFd() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSessionGetFd(streamSessionPtr_.get());
#else
return fd_;
#endif // OHOS_BUILD_ENABLE_RUST
}
const std::string& StreamSession::GetDescript() const
@ -142,12 +205,20 @@ const std::string StreamSession::GetProgramName() const
void StreamSession::SetTokenType(int32_t type)
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamSessionSetTokenType(streamSessionPtr_.get(), type);
#else
tokenType_ = type;
#endif // OHOS_BUILD_ENABLE_RUST
}
int32_t StreamSession::GetTokenType() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSessionGetTokenType(streamSessionPtr_.get());
#else
return tokenType_;
#endif // OHOS_BUILD_ENABLE_RUST
}
} // namespace Sensors
} // namespace OHOS

View File

@ -21,17 +21,24 @@
namespace OHOS {
namespace Sensors {
#ifndef OHOS_BUILD_ENABLE_RUST
namespace {
constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, SENSOR_LOG_DOMAIN, "StreamSocket" };
} // namespace
#endif // OHOS_BUILD_ENABLE_RUST
StreamSocket::StreamSocket() {}
StreamSocket::~StreamSocket()
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamSocketClose(streamSocketPtr_.get());
#else
Close();
#endif // OHOS_BUILD_ENABLE_RUST
}
#ifndef OHOS_BUILD_ENABLE_RUST
void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::PacketCallBackFun callbackFun)
{
constexpr size_t headSize = sizeof(PackHead);
@ -73,9 +80,13 @@ void StreamSocket::OnReadPackets(CircleStreamBuffer &circBuf, StreamSocket::Pack
}
}
}
#endif // OHOS_BUILD_ENABLE_RUST
void StreamSocket::Close()
{
#ifdef OHOS_BUILD_ENABLE_RUST
StreamSocketClose(streamSocketPtr_.get());
#else
if (fd_ >= 0) {
auto rf = close(fd_);
if (rf != 0) {
@ -83,11 +94,16 @@ void StreamSocket::Close()
}
}
fd_ = -1;
#endif // OHOS_BUILD_ENABLE_RUST
}
int32_t StreamSocket::GetFd() const
{
#ifdef OHOS_BUILD_ENABLE_RUST
return StreamSocketGetFd(streamSocketPtr_.get());
#else
return fd_;
#endif // OHOS_BUILD_ENABLE_RUST
}
} // namespace Sensors
} // namespace OHOS