mirror of
https://gitee.com/openharmony/startup_init
synced 2024-11-23 16:20:00 +00:00
新增loop系统测试用例
Signed-off-by: ohxianzhi <duxianzhi@huawei.com>
This commit is contained in:
parent
939de4d1a4
commit
1c7195027d
@ -17,8 +17,142 @@
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
|
||||
#include <linux/in.h>
|
||||
#include <linux/socket.h>
|
||||
#include <linux/tcp.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/un.h>
|
||||
|
||||
#include "le_streamtask.c"
|
||||
#include "loop_systest.h"
|
||||
#include "loop_event.h"
|
||||
#include "le_socket.h"
|
||||
#include "le_task.h"
|
||||
#include "list.h"
|
||||
|
||||
#define TASKINFO \
|
||||
uint32_t flags; \
|
||||
union { \
|
||||
int fd; \
|
||||
} taskId
|
||||
|
||||
typedef struct {
|
||||
TASKINFO;
|
||||
} TaskId;
|
||||
|
||||
typedef struct LiteTask_ {
|
||||
TASKINFO;
|
||||
HashNode hashNode;
|
||||
LE_Close close;
|
||||
DumpTaskInfo dumpTaskInfo;
|
||||
HandleTaskEvent handleEvent;
|
||||
HandleTaskClose innerClose;
|
||||
uint16_t userDataOffset;
|
||||
uint16_t userDataSize;
|
||||
} BaseTask;
|
||||
|
||||
typedef struct {
|
||||
BaseTask base;
|
||||
LE_IncommingConnect incommingConnect;
|
||||
char server[0];
|
||||
} StreamServerTask;
|
||||
|
||||
typedef struct MyTask_ {
|
||||
TaskHandle task;
|
||||
struct ListNode item;
|
||||
int ptyFd;
|
||||
pid_t pid;
|
||||
} MyTask;
|
||||
|
||||
static LoopHandle g_loop = NULL;
|
||||
|
||||
static int HandleServerEvent(const LoopHandle loopHandle, const TaskHandle serverTask, uint32_t oper)
|
||||
{
|
||||
printf("HandleServerEvent_ fd %d oper 0x%x \n", GetSocketFd(serverTask), oper);
|
||||
if (!LE_TEST_FLAGS(oper, EVENT_READ)) {
|
||||
return LE_FAILURE;
|
||||
}
|
||||
|
||||
StreamServerTask *server = (StreamServerTask *)serverTask;
|
||||
if (server->incommingConnect == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ret = server->incommingConnect(loopHandle, serverTask);
|
||||
if (ret != 0) {
|
||||
printf("HandleServerEvent_ fd %d do not accept socket \n", GetSocketFd(serverTask));
|
||||
}
|
||||
|
||||
EventLoop *loop = (EventLoop *)loopHandle;
|
||||
loop->modEvent(loop, (const BaseTask *)serverTask, EVENT_READ);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void HandleStreamTaskClose(const LoopHandle loopHandle, const TaskHandle taskHandle)
|
||||
{
|
||||
BaseTask *task = (BaseTask *)taskHandle;
|
||||
DelTask((EventLoop *)loopHandle, task);
|
||||
CloseTask(loopHandle, task);
|
||||
if (task->taskId.fd > 0) {
|
||||
printf("HandleStreamTaskClose::close fd:%d \n", task->taskId.fd);
|
||||
close(task->taskId.fd);
|
||||
}
|
||||
}
|
||||
|
||||
static void DumpStreamServerTaskInfo(const TaskHandle task)
|
||||
{
|
||||
if (task == nullptr) {
|
||||
printf("Dump empty task \n");
|
||||
return;
|
||||
}
|
||||
BaseTask *baseTask = dynamic_cast<BaseTask *>(task);
|
||||
StreamServerTask *serverTask = dynamic_cast<StreamServerTask *>(baseTask);
|
||||
|
||||
printf("\tfd: %d \n", serverTask->base.taskId.fd);
|
||||
printf("\t TaskType: %s \n", "ServerTask");
|
||||
if (strlen(serverTask->server) > 0) {
|
||||
printf("\t Server socket:%s \n", serverTask->server);
|
||||
} else {
|
||||
printf("\t Server socket:%s \n", "NULL");
|
||||
}
|
||||
}
|
||||
|
||||
static int IncommingConnect(const LoopHandle loop, const TaskHandle server)
|
||||
{
|
||||
TaskHandle client = NULL;
|
||||
LE_StreamInfo info = {};
|
||||
#ifndef STARTUP_INIT_TEST
|
||||
info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_CONNECT;
|
||||
#else
|
||||
info.baseInfo.flags = TASK_STREAM | TASK_PIPE | TASK_CONNECT | TASK_TEST;
|
||||
#endif
|
||||
info.baseInfo.close = OnClose;
|
||||
info.baseInfo.userDataSize = sizeof(MyTask);
|
||||
info.disConnectComplete = NULL;
|
||||
info.sendMessageComplete = NULL;
|
||||
info.recvMessage = CmdOnRecvMessage;
|
||||
int ret = LE_AcceptStreamClient(g_loop, server, &client, &info);
|
||||
if (ret != 0) {
|
||||
printf("Failed accept stream \n");
|
||||
return -1;
|
||||
}
|
||||
MyTask *agent = (MyTask *)LE_GetUserData(client);
|
||||
if (agent == nullptr) {
|
||||
printf("Invalid agent \n");
|
||||
return -1;
|
||||
}
|
||||
agent->task = client;
|
||||
OH_ListInit(&agent->item);
|
||||
ret = SendMessage(g_loop, agent->task, "connect success.");
|
||||
if (ret != 0) {
|
||||
printf("Failed send msg \n");
|
||||
return -1;
|
||||
}
|
||||
OH_ListAddTail(&g_cmdService.head, &agent->item);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
@ -48,6 +182,29 @@ int main()
|
||||
system("pause");
|
||||
return 0;
|
||||
}
|
||||
LoopHandle loopHandle = LE_GetDefaultLoop();
|
||||
EventLoop *loop = (EventLoop *)loopHandle;
|
||||
StreamServerTask *task = (StreamServerTask *)CreateTask(loopHandle, fd, &baseInfo,
|
||||
sizeof(StreamServerTask) + strlen(server) + 1);
|
||||
if (task == nullptr) {
|
||||
printf("Failed to create task \n");
|
||||
close(fd);
|
||||
system("pause");
|
||||
return 0;
|
||||
}
|
||||
|
||||
EventLoop *loop = (EventLoop *)LE_GetDefaultLoop();
|
||||
+ TaskHandle *taskHandle = NULL;
|
||||
task->base.handleEvent = HandleServerEvent;
|
||||
task->base.innerClose = HandleStreamTaskClose;
|
||||
task->base.dumpTaskInfo = DumpStreamServerTaskInfo;
|
||||
task->incommingConnect = incommingConnect;
|
||||
loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
|
||||
int ret = memcpy_s(task->server, strlen(server) + 1, server, strlen(server) + 1);
|
||||
if (ret != 0) {
|
||||
printf("Failed to copy server name %s", server);
|
||||
system("pause");
|
||||
return 0;
|
||||
}
|
||||
|
||||
*taskHandle = (TaskHandle)task;
|
||||
}
|
@ -16,8 +16,8 @@
|
||||
#ifndef LOOP_SYSTEST_H
|
||||
#define LOOP_SYSTEST_H
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <fcntl.h>
|
||||
|
||||
#include "loop_event.h"
|
||||
@ -30,6 +30,7 @@
|
||||
#endif
|
||||
|
||||
#define SOCKET_NAME "looptest"
|
||||
#define INIT_CONTROL_SOCKET_PATH "/dev/unix/socket/init_control_fd"
|
||||
#if defined(__MUSL__)
|
||||
#define SOCKET_DIR BASEDIR "/dev/unix/socket"
|
||||
#else
|
||||
@ -137,4 +138,50 @@ typedef struct {
|
||||
struct ListNode msgBlocks; // 保存实际的消息数据
|
||||
} ReqMsgNode;
|
||||
|
||||
typedef struct SpawnTime {
|
||||
int minTime;
|
||||
int maxTime;
|
||||
} SpawnTime;
|
||||
|
||||
typedef enum {
|
||||
MODE_FOR_APP_SPAWN,
|
||||
MODE_FOR_NWEB_SPAWN,
|
||||
MODE_FOR_APP_COLD_RUN,
|
||||
MODE_FOR_NWEB_COLD_RUN,
|
||||
MODE_FOR_NATIVE_SPAWN,
|
||||
MODE_FOR_CJAPP_SPAWN,
|
||||
MODE_INVALID
|
||||
} RunMode;
|
||||
|
||||
typedef struct {
|
||||
char *longProcName;
|
||||
uint32_t longProcNameLen;
|
||||
uint32_t sandboxNsFlags;
|
||||
int wdgOpened;
|
||||
RunMode mode;
|
||||
#ifndef OHOS_LITE
|
||||
int32_t preforkFd[2];
|
||||
int32_t parentToChildFd[2];
|
||||
char *propertyBuffer;
|
||||
int isPrefork;
|
||||
pid_t reservedPid;
|
||||
int enablePerfork;
|
||||
#endif
|
||||
} MgrContent;
|
||||
|
||||
typedef struct {
|
||||
MgrContent content;
|
||||
TaskHandle server;
|
||||
SignalHandle sigHandler;
|
||||
pid_t servicePid;
|
||||
struct ListNode appQueue; // save app pid and name
|
||||
uint32_t diedAppCount;
|
||||
uint32_t flags;
|
||||
struct ListNode diedQueue; // save app pid and name
|
||||
struct timespec perLoadStart;
|
||||
struct timespec perLoadEnd;
|
||||
struct ListNode extData;
|
||||
struct SpawnTime spawnTime;
|
||||
} AppMgr;
|
||||
|
||||
#endif
|
@ -36,9 +36,24 @@
|
||||
#include "le_task.h"
|
||||
#include "list.h"
|
||||
|
||||
#define MAX_MSG_LEN 128
|
||||
#define RETRY_TIME (200 * 1000) // 200 * 1000 wait 200ms CONNECT_RETRY_DELAY = 200 * 1000
|
||||
#define MAX_RETRY_SEND_COUNT 2 // 2 max retry count CONNECT_RETRY_MAX_TIMES = 2;
|
||||
|
||||
typedef struct Message_ {
|
||||
uint32_t magic;
|
||||
uint32_t msgType;
|
||||
uint32_t msgLen;
|
||||
uint32_t msgId;
|
||||
uint32_t tlvCount;
|
||||
char buffer[MAX_MSG_LEN];
|
||||
} Message;
|
||||
|
||||
typedef struct {
|
||||
Message msgHdr;
|
||||
Result result;
|
||||
} ResponseMsg;
|
||||
|
||||
typedef Agent_ {
|
||||
TaskHandle task;
|
||||
WatcherHandle input;
|
||||
|
@ -24,11 +24,30 @@
|
||||
#include "le_task.h"
|
||||
#include "list.h"
|
||||
|
||||
#define SLEEP_DURATION 3000 // us
|
||||
#define EXIT_TIMEOUT 1000000 // us
|
||||
#define APP_STATE_IDLE 1
|
||||
#define APP_STATE_SPAWNING 2
|
||||
#define APP_MAX_TIME 3000000
|
||||
|
||||
typedef void (* CallbackControlProcess)(uint16_t type, const char *serviceCmd, const void *context);
|
||||
|
||||
static Message *g_message = NULL;
|
||||
CallbackControlProcess g_controlFunc = NULL;
|
||||
|
||||
typedef struct {
|
||||
uint16_t tlvLen;
|
||||
uint16_t tlvType;
|
||||
} Tlv;
|
||||
|
||||
typedef enum {
|
||||
ACTION_SANDBOX = 0,
|
||||
ACTION_DUMP,
|
||||
ACTION_MODULEMGR,
|
||||
ACTION_SPAWNTIME,
|
||||
ACTION_MAX
|
||||
} ActionType;
|
||||
|
||||
typedef struct {
|
||||
uint16_t tlvLen; // 对齐后的长度
|
||||
uint16_t tlvType;
|
||||
@ -69,6 +88,7 @@ typedef struct MsgNode_ {
|
||||
} MsgNode;
|
||||
|
||||
static MyService g_service = NULL;
|
||||
static AppMgr *g_appMgr = NULL;
|
||||
|
||||
int MakeDirRec(const char *path, mode_t mode, int lastPath)
|
||||
{
|
||||
@ -172,14 +192,39 @@ static int CreateTcpServer(TaskHandle *server, const char *name)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void DeleteMsg(MsgNode *msgNode)
|
||||
{
|
||||
if (msgNode == NULL) {
|
||||
return;
|
||||
}
|
||||
if (msgNode->buffer) {
|
||||
free(msgNode->buffer);
|
||||
msgNode->buffer = NULL;
|
||||
}
|
||||
if (msgNode->tlvOffset) {
|
||||
free(msgNode->tlvOffset);
|
||||
msgNode->tlvOffset = NULL;
|
||||
}
|
||||
free(msgNode);
|
||||
}
|
||||
|
||||
static void WaitMsgCompleteTimeOut(const TimerHandle taskHandle, void *context)
|
||||
{
|
||||
MyTask *task = (MyTask *)context;
|
||||
printf("Long time no msg complete so close connectionId: %u \n", connection->connectionId);
|
||||
DeleteMsg(connection->receiverCtx.incompleteMsg);
|
||||
connection->receiverCtx.incompleteMsg = NULL;
|
||||
LE_CloseStreamTask(LE_GetDefaultLoop(), connection->stream);
|
||||
}
|
||||
|
||||
static inline int StartTimerForCheckMsg(MyTask *task)
|
||||
{
|
||||
if (connection->receiverCtx.timer != NULL) {
|
||||
if (task->receiverCtx.timer != NULL) {
|
||||
return 0;
|
||||
}
|
||||
int ret = LE_CreateTimer(LE_GetDefaultLoop(), &connection->receiverCtx.timer, WaitMsgCompleteTimeOut, connection);
|
||||
int ret = LE_CreateTimer(LE_GetDefaultLoop(), &task->receiverCtx.timer, WaitMsgCompleteTimeOut, task);
|
||||
if (ret == 0) {
|
||||
ret = LE_StartTimer(LE_GetDefaultLoop(), connection->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1);
|
||||
ret = LE_StartTimer(LE_GetDefaultLoop(), task->receiverCtx.timer, MAX_WAIT_MSG_COMPLETE, 1);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -337,6 +382,45 @@ static int MsgRebuild(MsgNode *message, const Message *msg)
|
||||
return 0;
|
||||
}
|
||||
|
||||
AppMgr *CreateMessage()
|
||||
{
|
||||
if (g_appMgr != NULL) {
|
||||
return g_appMgr;
|
||||
}
|
||||
AppMgr *appMgr = (AppMgr *)calloc(1, sizeof(AppMgr));
|
||||
if (appMgr == NULL) {
|
||||
printf("Failed to alloc memory \n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
appMgr->content.longProcName = NULL;
|
||||
appMgr->content.longProcNameLen = 0;
|
||||
appMgr->content.mode = mode;
|
||||
appMgr->content.sandboxNsFlags = 0;
|
||||
appMgr->content.wdgOpened = 0;
|
||||
appMgr->servicePid = getpid();
|
||||
appMgr->server = NULL;
|
||||
appMgr->sigHandler = NULL;
|
||||
OH_ListInit(&appMgr->appQueue);
|
||||
OH_ListInit(&appMgr->diedQueue);
|
||||
appMgr->diedAppCount = 0;
|
||||
OH_ListInit(&appMgr->extData);
|
||||
g_appMgr = appMgr;
|
||||
g_appMgr->spawnTime.minTime = APP_MAX_TIME;
|
||||
g_appMgr->spawnTime.maxTime = 0;
|
||||
return appMgr;
|
||||
}
|
||||
|
||||
AppMgr *GetAppMgr(void)
|
||||
{
|
||||
return g_appMgr;
|
||||
}
|
||||
|
||||
MgrContent *GetMgrContent(void)
|
||||
{
|
||||
return g_appMgr == NULL ? NULL : &g_appMgr->content;
|
||||
}
|
||||
|
||||
int GetMsgFromBuffer(const uint8_t *buffer, uint32_t bufferLen,
|
||||
Message **outMsg, uint32_t *msgRecvLen, uint32_t *reminder)
|
||||
{
|
||||
@ -401,14 +485,29 @@ int DecodeMsg(Message * message)
|
||||
return 0;
|
||||
}
|
||||
|
||||
inline int IsNWebMode(const AppMgr *content)
|
||||
{
|
||||
return (content != NULL) &&
|
||||
(content->content.mode == MODE_FOR_NWEB_SPAWN || content->content.mode == MODE_FOR_NWEB_COLD_RUN);
|
||||
}
|
||||
|
||||
int ProcessTerminationStatusMsg(const MsgNode *message, Result *result)
|
||||
{
|
||||
if (message == NULL || result == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!IsNWebMode(g_appMgr)) {
|
||||
return -1;
|
||||
}
|
||||
result->result = -1;
|
||||
result->pid = 0;
|
||||
pid_t *pid = (pid_t *)GetMsgInfo(message, TLV_RENDER_TERMINATION_INFO);
|
||||
if (pid == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// get render process termination status, only nwebspawn need this logic.
|
||||
result->pid = *pid;
|
||||
result->result = GetProcessTerminationStatus(*pid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -523,6 +622,19 @@ void *GetMsgExtInfo(const MsgNode *message, const char *name, uint32_t *len)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
MsgNode *CreateMsg(void)
|
||||
{
|
||||
MsgNode *message = (MsgNode *)calloc(1, sizeof(MsgNode));
|
||||
if (message == NULL) {
|
||||
printf("Failed to create message \n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
message->buffer = NULL;
|
||||
message->tlvOffset = NULL;
|
||||
return message;
|
||||
}
|
||||
|
||||
MsgNode *RebuildMsgNode(MsgNode *message, Process *info)
|
||||
{
|
||||
#ifdef DEBUG_BEGETCTL_BOOT
|
||||
@ -719,6 +831,98 @@ static void OnReceiveRequest(const TaskHandle taskHandle, const uint8_t *buffer,
|
||||
}
|
||||
}
|
||||
|
||||
void ServiceInit(const char *socketPath, CallbackControlProcess func, LoopHandle loop)
|
||||
{
|
||||
if ((socketPath == NULL) || (func == NULL) || (loop == NULL)) {
|
||||
BEGET_LOGE("[control_fd] Invalid parameter");
|
||||
return;
|
||||
}
|
||||
OH_ListInit(&g_cmdService.head);
|
||||
LE_StreamServerInfo info = {};
|
||||
info.baseInfo.flags = TASK_STREAM | TASK_SERVER | TASK_PIPE;
|
||||
info.server = (char *)socketPath;
|
||||
info.socketId = -1;
|
||||
info.baseInfo.close = NULL;
|
||||
info.disConnectComplete = NULL;
|
||||
info.incommingConnect = CmdOnIncommingConnect;
|
||||
info.sendMessageComplete = NULL;
|
||||
info.recvMessage = NULL;
|
||||
g_controlFunc = func;
|
||||
if (g_controlFdLoop == NULL) {
|
||||
g_controlFdLoop = loop;
|
||||
}
|
||||
(void)LE_CreateStreamServer(g_controlFdLoop, &g_cmdService.serverTask, &info);
|
||||
}
|
||||
|
||||
void ProcessControl(uint16_t type, const char *serviceCmd, const void *context)
|
||||
{
|
||||
if ((type >= ACTION_MAX) || (serviceCmd == NULL)) {
|
||||
return;
|
||||
}
|
||||
switch (type) {
|
||||
case ACTION_SANDBOX :
|
||||
ProcessSandboxControlFd(type, serviceCmd);
|
||||
break;
|
||||
case ACTION_DUMP :
|
||||
ProcessDumpServiceControlFd(type, serviceCmd);
|
||||
break;
|
||||
case ACTION_MODULEMGR :
|
||||
ProcessModuleMgrControlFd(type, serviceCmd);
|
||||
break;
|
||||
default :
|
||||
printf("Unknown control fd type. \n")
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void Init(const char *socketPath)
|
||||
{
|
||||
ServiceInit(socketPath, ProcessControl, LE_GetDefaultLoop());
|
||||
return;
|
||||
}
|
||||
|
||||
void TestApp()
|
||||
{
|
||||
int testNum = 0;
|
||||
int ret = scanf_s("%d", &testNum);
|
||||
if (ret <= 0) {
|
||||
printf("input error \n");
|
||||
return;
|
||||
}
|
||||
|
||||
char name[128];
|
||||
char context[128];
|
||||
for (int i = 0; i < testNum; ++i) {
|
||||
printf("请输入要测试的应用名称:(sandbox, dump, moudlemgr) \n");
|
||||
ret = scanf_s("%s", name, sizeof(name));
|
||||
if (ret <= 0) {
|
||||
printf("input error \n");
|
||||
return;
|
||||
}
|
||||
|
||||
int app = -1;
|
||||
if (strcmp(name, "sandbox") == 0) {
|
||||
app = ACTION_SANDBOX;
|
||||
} else if (strcmp(name, "dump") == 0) {
|
||||
app = ACTION_DUMP;
|
||||
} else if (strcmp(name, "modulemgr") == 0) {
|
||||
app = ACTION_MODULEMGR;
|
||||
} else {
|
||||
printf("input error \n");
|
||||
return;
|
||||
}
|
||||
|
||||
printf("请输入对应的应用输入参数:\n");
|
||||
ret = scanf_s("%s", context, sizeof(context));
|
||||
if (ret <= 0) {
|
||||
printf("input error \n");
|
||||
return;
|
||||
}
|
||||
|
||||
g_controlFunc(app, context, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char *const argv[])
|
||||
{
|
||||
printf("main argc: %d \n", argc);
|
||||
@ -748,5 +952,8 @@ int main(int argc, char *const argv[])
|
||||
return 0;
|
||||
}
|
||||
|
||||
Init(INIT_CONTROL_SOCKET_PATH);
|
||||
TestApp();
|
||||
|
||||
return 0;
|
||||
}
|
@ -17,9 +17,187 @@
|
||||
#include <string>
|
||||
#include <iostream>
|
||||
|
||||
#include <linux/in.h>
|
||||
#include <linux/socket.h>
|
||||
#include <linux/tcp.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/un.h>
|
||||
|
||||
#include "le_streamtask.c"
|
||||
#include "loop_systest.h"
|
||||
#include "loop_event.h"
|
||||
#include "le_socket.h"
|
||||
#include "le_task.h"
|
||||
#include "list.h"
|
||||
|
||||
static LE_STATUS HandleEvent(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
|
||||
{
|
||||
StreamConnectTask *task = (StreamConnectTask *)handle;
|
||||
printf("HandleEvent fd:%d oper 0x%x \n", GetSocketFd(handle), oper);
|
||||
|
||||
LE_STATUS status = LE_SUCCESS;
|
||||
if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
|
||||
status = HandleSendMsg(loopHandle, handle, task->sendMessageComplete);
|
||||
}
|
||||
if (LE_TEST_FLAGS(oper, EVENT_READ)) {
|
||||
status = HandleRecvMsg(loopHandle, handle, task->recvMessage, task->handleRecvMsg);
|
||||
}
|
||||
if (LE_TEST_FLAGS(oper, EVENT_ERROR)) {
|
||||
if (task->disConnectComplete) {
|
||||
task->disConnectComplete(handle);
|
||||
}
|
||||
LE_CloseStreamTask(loopHandle, handle);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
static LE_STATUS HandleSendMsg(const LoopHandle loopHandle,
|
||||
const TaskHandle taskHandle, const LE_SendMessageComplete complete)
|
||||
{
|
||||
EventLoop *loop = (EventLoop *)loopHandle;
|
||||
StreamTask *task = (StreamTask *)taskHandle;
|
||||
LE_Buffer *buffer = GetFirstBuffer(task);
|
||||
while (buffer) {
|
||||
int ret = write(GetSocketFd(taskHandle), buffer->data, buffer->dataSize);
|
||||
if (ret < buffer->dataSize) {
|
||||
printf("fd:%d size %d, err:%d \n", GetSocketFd(taskHandle), buffer->dataSize, errno);
|
||||
}
|
||||
printf("HandleSendMsg fd:%d send data size %d %d \n", GetSocketFd(taskHandle), buffer->dataSize, ret);
|
||||
buffer->result = (ret == (int)buffer->dataSize) ? 0 : errno;
|
||||
if (complete != NULL) {
|
||||
complete(taskHandle, buffer);
|
||||
}
|
||||
FreeBuffer(loopHandle, task, buffer);
|
||||
buffer = GetFirstBuffer(task);
|
||||
}
|
||||
if (IsBufferEmpty(task)) {
|
||||
printf("HandleSendMsg fd:%d empty wait read \n", GetSocketFd(taskHandle));
|
||||
loop->modEvent(loop, (const BaseTask *)taskHandle, EVENT_READ);
|
||||
return LE_SUCCESS;
|
||||
}
|
||||
return LE_SUCCESS;
|
||||
}
|
||||
|
||||
static LE_STATUS HandleClientEvent(const LoopHandle loopHandle, const TaskHandle handle, uint32_t oper)
|
||||
{
|
||||
StreamClientTask *client = (StreamClientTask *)handle;
|
||||
printf("HandleClientEvent fd:%d oper 0x%x \n", GetSocketFd(handle), oper);
|
||||
|
||||
LE_STATUS status = LE_SUCCESS;
|
||||
if (LE_TEST_FLAGS(oper, EVENT_WRITE)) {
|
||||
if (client->connected == 0 && client->connectComplete) {
|
||||
client->connectComplete(handle);
|
||||
}
|
||||
client->connected = 1;
|
||||
status = HandleSendMsg(loopHandle, handle, client->sendMessageComplete);
|
||||
}
|
||||
if (LE_TEST_FLAGS(oper, EVENT_READ)) {
|
||||
status = HandleRecvMsg(loopHandle, handle, client->recvMessage, client->handleRecvMsg);
|
||||
}
|
||||
if (status == LE_DIS_CONNECTED) {
|
||||
if (client->disConnectComplete) {
|
||||
client->disConnectComplete(handle);
|
||||
}
|
||||
client->connected = 0;
|
||||
LE_CloseStreamTask(loopHandle, handle);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
static void HandleTaskClose(const LoopHandle loopHandle, const TaskHandle taskHandle)
|
||||
{
|
||||
BaseTask *baseTask = (BaseTask *)taskHandle;
|
||||
printf("HandleTaskClose::DelTask \n");
|
||||
DelTask((EventLoop *)loopHandle, baseTask);
|
||||
|
||||
printf("HandleTaskClose::DCloseTask \n");
|
||||
CloseTask(loopHandle, task);
|
||||
|
||||
if (baseTask->taskId.fd > 0) {
|
||||
printf("HandleTaskClose fd: %d \n", ask->taskId.fd);
|
||||
close(baseTask->taskId.fd);
|
||||
}
|
||||
}
|
||||
|
||||
static void DumpConnectTaskInfo(const TaskHandle task)
|
||||
{
|
||||
if (task == NULL) {
|
||||
printf("task is null \n");
|
||||
return;
|
||||
}
|
||||
|
||||
BaseTask *baseTask = (BaseTask *)task;
|
||||
StreamConnectTask *connectTask = (StreamConnectTask *)baseTask;
|
||||
TaskHandle taskHandle = (TaskHandle)connectTask;
|
||||
printf("\tfd: %d \n", connectTask->stream.base.taskId.fd);
|
||||
printf("\t TaskType: %s \n", "ConnectTask");
|
||||
printf("\t ServiceInfo: \n");
|
||||
|
||||
struct ucred cred = {-1, -1, -1};
|
||||
socklen_t credSize = sizeof(struct ucred);
|
||||
if (getsockopt(LE_GetSocketFd(taskHandle), SOL_SOCKET, SO_PEERCRED, &cred, &credSize) == 0) {
|
||||
printf("\t Service Pid: %d \n", cred.pid);
|
||||
printf("\t Service Uid: %d \n", cred.uid);
|
||||
printf("\t Service Gid: %d \n", cred.gid);
|
||||
} else {
|
||||
printf("\t Service Pid: %s \n", "NULL");
|
||||
printf("\t Service Uid: %s \n", "NULL");
|
||||
printf("\t Service Gid: %s \n", "NULL");
|
||||
}
|
||||
}
|
||||
|
||||
int AcceptClient(const LoopHandle loopHandle, const TaskHandle server,
|
||||
TaskHandle *taskHandle, const LE_StreamInfo *info)
|
||||
{
|
||||
if (loopHandle == NULL || info == NULL) {
|
||||
printf("Invalid parameters \n");
|
||||
return -1;
|
||||
}
|
||||
if (server == NULL || taskHandle == NULL) {
|
||||
printf("Invalid parameters \n");
|
||||
return -1;
|
||||
}
|
||||
if (info->recvMessage == NULL) {
|
||||
printf("Invalid parameters recvMessage \n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int fd = -1;
|
||||
if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
|
||||
fd = AcceptSocket(GetSocketFd(server), info->baseInfo.flags);
|
||||
if (fd <= 0) {
|
||||
printf("Failed to accept socket %d \n", GetSocketFd(server));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
StreamConnectTask *task = (StreamConnectTask *)CreateTask(
|
||||
loopHandle, fd, &info->baseInfo, sizeof(StreamConnectTask));
|
||||
if (task == NULL) {
|
||||
printf("Failed to create task %d \n", GetSocketFd(server));
|
||||
close(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
task->stream.base.handleEvent = HandleEvent;
|
||||
task->stream.base.innerClose = HandleTaskClose;
|
||||
task->stream.base.dumpTaskInfo = DumpConnectTaskInfo;
|
||||
task->disConnectComplete = info->disConnectComplete;
|
||||
task->sendMessageComplete = info->sendMessageComplete;
|
||||
task->serverTask = (StreamServerTask *)server;
|
||||
task->handleRecvMsg = info->handleRecvMsg;
|
||||
task->recvMessage = info->recvMessage;
|
||||
|
||||
OH_ListInit(&task->stream.buffHead);
|
||||
LoopMutexInit(&task->stream.mutex);
|
||||
if ((info->baseInfo.flags & TASK_TEST) != TASK_TEST) {
|
||||
EventLoop *loop = (EventLoop *)loopHandle;
|
||||
loop->addEvent(loop, (const BaseTask *)task, EVENT_READ);
|
||||
}
|
||||
*taskHandle = (TaskHandle)task;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
@ -30,10 +208,10 @@ int main()
|
||||
int type;
|
||||
std::string path;
|
||||
if (socket_type == "pipe") {
|
||||
type = TASK_STREAM | TASK_PIPE |TASK_SERVER | TASK_TEST;
|
||||
type = TASK_STREAM | TASK_PIPE |TASK_SERVER;
|
||||
path = "/data/testpipe";
|
||||
} else if (socket_type == "tcp") {
|
||||
type = TASK_STREAM | TASK_TCP |TASK_SERVER | TASK_TEST;
|
||||
type = TASK_STREAM | TASK_TCP |TASK_SERVER;
|
||||
path = "127.0.0.1:7777";
|
||||
} else {
|
||||
printf("输入有误,请输入pipe或者tcp!");
|
||||
|
Loading…
Reference in New Issue
Block a user