!149 RPC 代码优化

Merge pull request !149 from pilipala195/master
This commit is contained in:
openharmony_ci 2022-02-26 09:21:26 +00:00 committed by Gitee
commit 35dcbe41fa
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
10 changed files with 42 additions and 37 deletions

View File

@ -50,6 +50,7 @@ enum {
#define MAX_DEATH_CALLBACK_NUM 4 #define MAX_DEATH_CALLBACK_NUM 4
#define RPC_DEFAULT_SEND_WAIT_TIME 4 #define RPC_DEFAULT_SEND_WAIT_TIME 4
#define RPC_MAX_SEND_WAIT_TIME 3000
#if defined(__LITEOS_M__) #if defined(__LITEOS_M__)
#define IF_PROT_DEFAULT IF_PROT_DATABUS #define IF_PROT_DEFAULT IF_PROT_DATABUS

View File

@ -31,7 +31,7 @@ void DeleteRpcInvoker(RemoteInvoker *remoteInvoker);
void RpcStopWorkThread(void); void RpcStopWorkThread(void);
int32_t OnReceiveNewConnection(int sessionId); int32_t OnReceiveNewConnection(int sessionId);
void OnDatabusSessionClosed(int sessionId); void OnDatabusSessionClosed(int sessionId);
void OnMessageAvailable(int sessionId, const void *data, unsigned int len); void OnMessageAvailable(int sessionId, const void *data, uint32_t len);
void UpdateClientSession(int32_t handle, HandleSessionList *sessionObject, void UpdateClientSession(int32_t handle, HandleSessionList *sessionObject,
const char *serviceName, const char *deviceId); const char *serviceName, const char *deviceId);
int32_t CreateTransServer(const char *sessionName); int32_t CreateTransServer(const char *sessionName);

View File

@ -129,7 +129,7 @@ uint64_t ProcessGetSeqNumber(void);
int32_t AttachHandleToIndex(HandleToIndexList *handleToIndex); int32_t AttachHandleToIndex(HandleToIndexList *handleToIndex);
void DetachHandleToIndex(HandleToIndexList *handleToIndex); void DetachHandleToIndex(HandleToIndexList *handleToIndex);
HandleToIndexList *QueryHandleToIndex(uint32_t handle); HandleToIndexList *QueryHandleToIndex(uint32_t handle);
int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, int userWaitTime); int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, uint32_t userWaitTime);
void EraseThreadBySeqNumber(ThreadMessageInfo *messageInfo); void EraseThreadBySeqNumber(ThreadMessageInfo *messageInfo);
ThreadMessageInfo *QueryThreadBySeqNumber(uint64_t seqNumber); ThreadMessageInfo *QueryThreadBySeqNumber(uint64_t seqNumber);
void WakeUpThreadBySeqNumber(uint64_t seqNumber, uint32_t handle); void WakeUpThreadBySeqNumber(uint64_t seqNumber, uint32_t handle);

View File

@ -25,7 +25,7 @@
extern "C" { extern "C" {
#endif #endif
int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest *func); int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest func);
int32_t GetPidAndUidInfoStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option); int32_t GetPidAndUidInfoStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option);
int32_t GrantDataBusNameStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option); int32_t GrantDataBusNameStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option);

View File

@ -17,7 +17,7 @@
#include "rpc_errno.h" #include "rpc_errno.h"
int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest *func) int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest func)
{ {
return ERR_NONE; return ERR_NONE;
} }

View File

@ -113,7 +113,7 @@ static int32_t InvokerDataBusThread(IpcIo *data, IpcIo *reply, OnRemoteRequest f
return MakeStubCached(reply, func, sessionName, deviceID); return MakeStubCached(reply, func, sessionName, deviceID);
} }
int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest *func) int32_t InvokerListenThreadStub(uint32_t code, IpcIo *data, IpcIo *reply, MessageOption option, OnRemoteRequest func)
{ {
uint16_t type; uint16_t type;
if (!ReadUint16(data, &type)) { if (!ReadUint16(data, &type)) {

View File

@ -80,7 +80,7 @@ static void ToTransData(const IpcIo *data, dbinder_transaction_data *buf)
buf->buffer_size = (data == NULL) ? 0 : (data->bufferCur - data->bufferBase); buf->buffer_size = (data == NULL) ? 0 : (data->bufferCur - data->bufferBase);
buf->offsets = buf->buffer_size; buf->offsets = buf->buffer_size;
buf->offsets_size = (data == NULL) ? 0 : buf->offsets_size = (data == NULL) ? 0 :
((size_t)(data->offsetsCur - data->offsetsBase) * sizeof(size_t)); (uint64_t)((data->offsetsCur - data->offsetsBase) * sizeof(size_t));
} }
static void ToIpcData(const dbinder_transaction_data *tr, IpcIo *data) static void ToIpcData(const dbinder_transaction_data *tr, IpcIo *data)
@ -159,7 +159,7 @@ static int32_t MoveTransData2Buffer(HandleSessionList *sessionObject, dbinder_tr
RPC_LOG_ERROR("sessionObject buffer malloc failed"); RPC_LOG_ERROR("sessionObject buffer malloc failed");
return ERR_FAILED; return ERR_FAILED;
} }
sessionObject->len = (uint32_t)transData->sizeOfSelf; sessionObject->len = transData->sizeOfSelf;
if (memcpy_s(sessionObject->buffer, sizeof(dbinder_transaction_data), if (memcpy_s(sessionObject->buffer, sizeof(dbinder_transaction_data),
transData, sizeof(dbinder_transaction_data)) != EOK) { transData, sizeof(dbinder_transaction_data)) != EOK) {
@ -178,7 +178,7 @@ static int32_t MoveTransData2Buffer(HandleSessionList *sessionObject, dbinder_tr
return ERR_NONE; return ERR_NONE;
} }
static HandleSessionList *WriteTransaction(int cmd, MessageOption option, int32_t handle, static HandleSessionList *WriteTransaction(int32_t cmd, MessageOption option, int32_t handle,
int32_t sessionId, uint32_t code, IpcIo *data, uint64_t *seqNumber, int status) int32_t sessionId, uint32_t code, IpcIo *data, uint64_t *seqNumber, int status)
{ {
HandleSessionList *sessionObject = GetSessionObject(handle, sessionId); HandleSessionList *sessionObject = GetSessionObject(handle, sessionId);
@ -238,7 +238,7 @@ static int32_t OnSendMessage(HandleSessionList *sessionOfPeer)
} }
int32_t ret = rpcSkeleton->rpcTrans->Send((int)sessionOfPeer->sessionId, int32_t ret = rpcSkeleton->rpcTrans->Send((int)sessionOfPeer->sessionId,
(void *)sessionOfPeer->buffer, (unsigned int)sessionOfPeer->len); (void *)sessionOfPeer->buffer, (uint32_t)sessionOfPeer->len);
free(sessionOfPeer->buffer); free(sessionOfPeer->buffer);
return ret; return ret;
@ -290,7 +290,7 @@ static int32_t HandleReply(uint64_t seqNumber, IpcIo *reply, uintptr_t *buffer)
return ERR_NONE; return ERR_NONE;
} }
static int32_t WaitForReply(uint64_t seqNumber, IpcIo *reply, uint32_t handle, int userWaitTime, uintptr_t *buffer) static int32_t WaitForReply(uint64_t seqNumber, IpcIo *reply, uint32_t handle, uint32_t userWaitTime, uintptr_t *buffer)
{ {
if (reply == NULL || userWaitTime == 0) { if (reply == NULL || userWaitTime == 0) {
return ERR_NONE; return ERR_NONE;
@ -313,7 +313,7 @@ static int32_t WaitForReply(uint64_t seqNumber, IpcIo *reply, uint32_t handle, i
return result; return result;
} }
static int32_t SendOrWaitForCompletion(int userWaitTime, uint64_t seqNumber, static int32_t SendOrWaitForCompletion(uint32_t userWaitTime, uint64_t seqNumber,
HandleSessionList *sessionOfPeer, IpcIo *reply, uintptr_t *buffer) HandleSessionList *sessionOfPeer, IpcIo *reply, uintptr_t *buffer)
{ {
if (seqNumber == 0) { if (seqNumber == 0) {
@ -594,7 +594,7 @@ void OnDatabusSessionClosed(int sessionId)
} }
} }
static uint32_t HasCompletePackage(const char *data, uint32_t readCursor, unsigned int len) static uint32_t HasCompletePackage(const char *data, uint32_t readCursor, uint32_t len)
{ {
const dbinder_transaction_data *tr = (const dbinder_transaction_data *)(data + readCursor); const dbinder_transaction_data *tr = (const dbinder_transaction_data *)(data + readCursor);
if ((tr->magic == DBINDER_MAGICWORD) && if ((tr->magic == DBINDER_MAGICWORD) &&
@ -605,7 +605,7 @@ static uint32_t HasCompletePackage(const char *data, uint32_t readCursor, unsign
return 0; return 0;
} }
void OnMessageAvailable(int sessionId, const void *data, unsigned int len) void OnMessageAvailable(int sessionId, const void *data, uint32_t len)
{ {
if (sessionId < 0 || data == NULL || len < sizeof(dbinder_transaction_data)) { if (sessionId < 0 || data == NULL || len < sizeof(dbinder_transaction_data)) {
RPC_LOG_ERROR("session has wrong inputs"); RPC_LOG_ERROR("session has wrong inputs");
@ -701,7 +701,13 @@ static int32_t RpcInvokerSendRequest(SvcIdentity target, uint32_t code, IpcIo *d
RPC_LOG_INFO("RPCInvokerSendRequest called"); RPC_LOG_INFO("RPCInvokerSendRequest called");
int32_t result = ERR_NONE; int32_t result = ERR_NONE;
uint64_t seqNumber = 0; uint64_t seqNumber = 0;
int userWaitTime = RPC_DEFAULT_SEND_WAIT_TIME;
uint32_t userWaitTime = option.waitTime;
if (userWaitTime < 0) {
userWaitTime = RPC_DEFAULT_SEND_WAIT_TIME;
} else if (userWaitTime > RPC_MAX_SEND_WAIT_TIME) {
userWaitTime = RPC_MAX_SEND_WAIT_TIME;
}
HandleSessionList *sessinoObject = WriteTransaction(BC_TRANSACTION, option, target.handle, HandleSessionList *sessinoObject = WriteTransaction(BC_TRANSACTION, option, target.handle,
0, code, data, &seqNumber, 0); 0, code, data, &seqNumber, 0);

View File

@ -365,7 +365,7 @@ static int32_t AddThreadBySeqNumber(ThreadMessageInfo *messageInfo)
return ERR_NONE; return ERR_NONE;
} }
int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, int userWaitTime) int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo, uint32_t userWaitTime)
{ {
if (AddThreadBySeqNumber(messageInfo) != ERR_NONE) { if (AddThreadBySeqNumber(messageInfo) != ERR_NONE) {
RPC_LOG_ERROR("add seqNumber = %llu failed", seqNumber); RPC_LOG_ERROR("add seqNumber = %llu failed", seqNumber);
@ -393,25 +393,22 @@ int32_t AddSendThreadInWait(uint64_t seqNumber, ThreadMessageInfo *messageInfo,
} }
pthread_mutex_lock(&threadLockInfo->mutex); pthread_mutex_lock(&threadLockInfo->mutex);
if (userWaitTime < 0) {
pthread_cond_wait(&threadLockInfo->condition, &threadLockInfo->mutex);
} else {
struct timespec waitTime;
struct timeval now;
if (gettimeofday(&now, NULL) != 0) {
RPC_LOG_ERROR("gettimeofday failed");
pthread_mutex_unlock(&threadLockInfo->mutex);
return ERR_FAILED;
}
waitTime.tv_sec = now.tv_sec + RPC_DEFAULT_SEND_WAIT_TIME; struct timespec waitTime;
waitTime.tv_nsec = now.tv_usec * USECTONSEC; struct timeval now;
int ret = pthread_cond_timedwait(&threadLockInfo->condition, &threadLockInfo->mutex, &waitTime); if (gettimeofday(&now, NULL) != 0) {
RPC_LOG_ERROR("gettimeofday failed");
pthread_mutex_unlock(&threadLockInfo->mutex); pthread_mutex_unlock(&threadLockInfo->mutex);
if (ret == ETIMEDOUT) { return ERR_FAILED;
RPC_LOG_ERROR("send thread wait for reply timeout"); }
return ERR_FAILED;
} waitTime.tv_sec = now.tv_sec + userWaitTime;
waitTime.tv_nsec = now.tv_usec * USECTONSEC;
int ret = pthread_cond_timedwait(&threadLockInfo->condition, &threadLockInfo->mutex, &waitTime);
pthread_mutex_unlock(&threadLockInfo->mutex);
if (ret == ETIMEDOUT) {
RPC_LOG_ERROR("send thread wait for reply timeout");
return ERR_FAILED;
} }
return ERR_NONE; return ERR_NONE;

View File

@ -36,7 +36,8 @@ void ServerDead1(void *args)
} }
MessageOption option = { MessageOption option = {
.flags = TF_OP_SYNC .flags = TF_OP_SYNC,
.waitTime = RPC_DEFAULT_SEND_WAIT_TIME
}; };
SvcIdentity sid; SvcIdentity sid;
char deviceId[DEVICEID_LENGTH]; char deviceId[DEVICEID_LENGTH];

View File

@ -88,14 +88,14 @@ typedef struct {
uint32_t sizeOfSelf; uint32_t sizeOfSelf;
uint32_t magic; uint32_t magic;
uint32_t version; uint32_t version;
int cmd; int32_t cmd;
uint32_t code; uint32_t code;
uint32_t flags; uint32_t flags;
uint64_t cookie; uint64_t cookie;
uint64_t seqNumber; uint64_t seqNumber;
size_t buffer_size; uint64_t buffer_size;
size_t offsets_size; uint64_t offsets_size;
uintptr_t offsets; uint64_t offsets;
char *buffer; char *buffer;
} dbinder_transaction_data; } dbinder_transaction_data;