添加自动拆分采集数据的功能

Signed-off-by: cs1111 <chenshi51@huawei.com>
This commit is contained in:
cs1111 2022-09-05 11:17:49 +08:00
parent 1f0880097a
commit 7612eaea52
48 changed files with 572 additions and 79 deletions

View File

@ -39,6 +39,7 @@ public:
virtual bool StartPluginSession(const std::vector<uint32_t>& pluginIds,
const std::vector<ProfilerPluginConfig>& config) = 0;
virtual bool StopPluginSession(const std::vector<uint32_t>& pluginIds) = 0;
virtual bool ReportPluginBasicData(const std::vector<uint32_t>& pluginIds) = 0;
virtual bool CreateWriter(std::string pluginName, uint32_t bufferSize, int smbFd, int eventFd) = 0;
virtual bool ResetWriter(uint32_t pluginId) = 0;

View File

@ -23,6 +23,11 @@ public:
virtual ~Writer() {}
virtual long Write(const void* data, size_t size) = 0;
virtual bool Flush() = 0;
virtual bool Clear()
{
return true;
}
};
#endif // !WRITER_H

View File

@ -101,3 +101,12 @@ bool BufferWriter::Flush()
bytesPending_ = 0;
return true;
}
bool BufferWriter::Clear()
{
if (shareMemoryBlock_ == nullptr) {
return false;
}
shareMemoryBlock_->ClearShareMemoryBlock();
return true;
}

View File

@ -34,6 +34,7 @@ public:
~BufferWriter();
long Write(const void* data, size_t size) override;
bool Flush() override;
bool Clear() override;
bool WriteMessage(const google::protobuf::Message& pmsg, const std::string& pluginName);
private:

View File

@ -153,6 +153,27 @@ bool CommandPoller::OnStopSessionCmd(const StopSessionCmd& cmd) const
return true;
}
bool CommandPoller::OnReportBasicDataCmd(const RefreshSessionCmd& cmd) const
{
HILOG_DEBUG(LOG_CORE, "%s:proc", __func__);
if (cmd.plugin_ids().size() == 0) {
HILOG_ERROR(LOG_CORE, "%s:cmd invalid!", __func__);
return false;
}
std::vector<uint32_t> pluginIds;
pluginIds.push_back(cmd.plugin_ids(0));
auto pluginManager = pluginManager_.lock(); // promote to shared_ptr
CHECK_NOTNULL(pluginManager, false, "promote FAILED!");
if (!pluginManager->ReportPluginBasicData(pluginIds)) {
HILOG_ERROR(LOG_CORE, "%s:report basic data failed!", __func__);
return false;
}
HILOG_INFO(LOG_CORE, "%s:ok", __func__);
return true;
}
bool CommandPoller::OnGetCommandResponse(SocketContext& context, ::GetCommandResponse& response)
{
HILOG_DEBUG(LOG_CORE, "%s:proc", __func__);
@ -187,6 +208,9 @@ bool CommandPoller::OnGetCommandResponse(SocketContext& context, ::GetCommandRes
} else {
status->set_state(ProfilerPluginState::IN_SESSION);
}
} else if (response.has_refresh_session_cmd()) {
OnReportBasicDataCmd(response.refresh_session_cmd());
status->set_state(ProfilerPluginState::IN_SESSION);
} else {
HILOG_DEBUG(LOG_CORE, "%s:command Response failed!", __func__);
return false;

View File

@ -38,6 +38,7 @@ public:
bool OnDestroySessionCmd(const DestroySessionCmd& cmd) const;
bool OnStartSessionCmd(const StartSessionCmd& cmd) const;
bool OnStopSessionCmd(const StopSessionCmd& cmd) const;
bool OnReportBasicDataCmd(const RefreshSessionCmd& cmd) const;
uint32_t GetRequestId();
bool OnConnect();

View File

@ -63,7 +63,14 @@ std::string ComputeFileSha256(const std::string& path)
}
} // namespace
PluginManager::~PluginManager() {}
PluginManager::~PluginManager()
{
for (size_t i = 0; i < updateThreadVec_.size(); i++) {
if (updateThreadVec_[i].joinable()) {
updateThreadVec_[i].join();
}
}
}
void PluginManager::SetCommandPoller(const CommandPollerPtr& p)
{
@ -101,12 +108,23 @@ bool PluginManager::AddPlugin(const std::string& pluginPath)
return false;
}
return RegisterPlugin(plugin, pluginPath, pluginName);
}
bool PluginManager::RegisterPlugin(const PluginModulePtr& plugin,
const std::string& pluginPath, const std::string& pluginName)
{
std::string outputFileName = "";
plugin->GetOutFileName(outputFileName);
RegisterPluginRequest request;
request.set_request_id(commandPoller_->GetRequestId());
request.set_path(pluginPath);
request.set_sha256(ComputeFileSha256(pluginPath));
request.set_name(pluginName);
request.set_buffer_size_hint(0);
request.set_is_standalone_data(plugin->GetStandaloneFileData());
request.set_out_file_name(outputFileName);
RegisterPluginResponse response;
if (commandPoller_->RegisterPlugin(request, response)) {
@ -284,6 +302,7 @@ bool PluginManager::StartPluginSession(const std::vector<uint32_t>& pluginIds,
if (!plugin->StartSession(reinterpret_cast<const uint8_t*>(cfgData.c_str()), cfgData.size())) {
return false;
}
if (plugin->GetSampleMode() == PluginModule::SampleMode::POLLING) {
if (idx > config.size()) {
HILOG_WARN(LOG_CORE, "%s:idx %zu out of size %zu", __func__, idx, config.size());
@ -304,12 +323,28 @@ bool PluginManager::StartPluginSession(const std::vector<uint32_t>& pluginIds,
}
}
// need update standalone plugin info
if (plugin->GetStandaloneFileData()) {
auto updateThread_ = std::thread(&PluginManager::UpdatePluginInfo, this, plugin);
updateThreadVec_.push_back(std::move(updateThread_));
}
idx++;
}
return true;
}
void PluginManager::UpdatePluginInfo(const PluginModulePtr& plugin)
{
// update plugin info
auto name = plugin.get()->GetPluginName();
auto path = plugin.get()->GetPath();
if (!RegisterPlugin(plugin, path, name)) {
HILOG_ERROR(LOG_CORE, "%s failed, name:%s, path:%s", __func__, name.c_str(), path.c_str());
}
}
bool PluginManager::StopPluginSession(const std::vector<uint32_t>& pluginIds)
{
HILOG_INFO(LOG_CORE, "%s:ready!", __func__);
@ -335,6 +370,23 @@ bool PluginManager::StopPluginSession(const std::vector<uint32_t>& pluginIds)
return true;
}
bool PluginManager::ReportPluginBasicData(const std::vector<uint32_t>& pluginIds)
{
HILOG_INFO(LOG_CORE, "%s:ready!", __func__);
for (uint32_t id : pluginIds) {
if (pluginModules_.find(id) == pluginModules_.end()) {
HILOG_ERROR(LOG_CORE, "%s:plugin not find", __func__);
return false;
}
// notify plugin to report basic data
if (!pluginModules_[id]->ReportBasicData()) {
HILOG_ERROR(LOG_CORE, "%s:report basic data failed", __func__);
return false;
}
}
return true;
}
bool PluginManager::SubmitResult(const PluginResult& pluginResult)
{
HILOG_INFO(LOG_CORE, "%s:ready!", __func__);

View File

@ -30,6 +30,7 @@ class PluginResult;
class CommandPoller;
using CommandPollerPtr = STD_PTR(shared, CommandPoller);
using PluginModulePtr = STD_PTR(shared, PluginModule);
class PluginManager : public ManagerInterface {
public:
@ -46,6 +47,7 @@ public:
bool DestroyPluginSession(const std::vector<uint32_t>& pluginIds);
bool StartPluginSession(const std::vector<uint32_t>& pluginIds, const std::vector<ProfilerPluginConfig>& config);
bool StopPluginSession(const std::vector<uint32_t>& pluginIds);
bool ReportPluginBasicData(const std::vector<uint32_t>& pluginIds);
// call the 'PluginModule::ReportResult' and 'PluginManager::SubmitResult' according to 'pluginId'
// creat PluginResult for current plug-in inside
@ -56,6 +58,8 @@ public:
bool CreateWriter(std::string pluginName, uint32_t bufferSize, int smbFd, int eventFd);
bool ResetWriter(uint32_t pluginId);
void SetCommandPoller(const CommandPollerPtr& p);
bool RegisterPlugin(const PluginModulePtr& plugin, const std::string& pluginPath, const std::string& pluginName);
void UpdatePluginInfo(const PluginModulePtr& pluginIds);
private:
std::map<uint32_t, std::shared_ptr<PluginModule>> pluginModules_;
@ -63,6 +67,7 @@ private:
CommandPollerPtr commandPoller_;
ScheduleTaskManager scheduleTaskManager_;
std::map<std::string, std::string> pluginPathAndNameMap_;
std::vector<std::thread> updateThreadVec_;
};
#endif // PLUGIN_MANAGER_H

View File

@ -75,6 +75,8 @@ bool PluginModule::GetInfo(PluginModuleInfo& info)
}
info.bufferSizeHint = structPtr_->resultBufferSizeHint;
info.name.assign(structPtr_->name);
info.isStandaloneFileData = structPtr_->isStandaloneFileData;
info.outFileName.assign(structPtr_->outFileName);
return true;
}
return false;
@ -126,6 +128,47 @@ bool PluginModule::GetBufferSizeHint(uint32_t& bufferSizeHint)
return false;
}
bool PluginModule::GetStandaloneFileData()
{
if (handle_ != nullptr) {
if (structPtr_ == nullptr) {
return false;
}
return structPtr_->isStandaloneFileData;
}
return false;
}
bool PluginModule::GetOutFileName(std::string& outFileName)
{
if (handle_ != nullptr) {
if (structPtr_ == nullptr) {
return false;
}
outFileName.assign(structPtr_->outFileName);
return true;
}
return false;
}
std::string PluginModule::GetPath()
{
return path_;
}
std::string PluginModule::GetPluginName()
{
if (pluginName_ == "") {
if (handle_ != nullptr) {
if (structPtr_ == nullptr) {
return "";
}
pluginName_.assign(structPtr_->name);
}
}
return pluginName_;
}
bool PluginModule::IsLoaded()
{
return (handle_ != nullptr);
@ -197,6 +240,24 @@ bool PluginModule::StopSession()
return false;
}
bool PluginModule::ReportBasicData()
{
HILOG_INFO(LOG_CORE, "%s:report basic data ready!", __func__);
if (handle_ == nullptr) {
HILOG_ERROR(LOG_CORE, "%s:plugin not load", __func__);
return false;
}
if (structPtr_ != nullptr && structPtr_->callbacks != nullptr) {
if (structPtr_->callbacks->onReportBasicDataCallback != nullptr) {
auto write = GetWriter();
CHECK_NOTNULL(write, false, "%s:get write falied!", __func__);
write->Clear(); // clear share memory block
return (structPtr_->callbacks->onReportBasicDataCallback() == 0);
}
}
return false;
}
int32_t PluginModule::ReportResult(uint8_t* buffer, uint32_t size)
{
if (handle_ == nullptr) {

View File

@ -26,6 +26,8 @@
struct PluginModuleInfo {
std::string name;
uint32_t bufferSizeHint = 0;
bool isStandaloneFileData = false;
std::string outFileName = "";
};
struct PluginModuleStruct;
@ -51,6 +53,10 @@ public:
bool GetInfo(PluginModuleInfo& info);
bool GetPluginName(std::string& pluginName);
bool GetBufferSizeHint(uint32_t& bufferSizeHint);
bool GetStandaloneFileData();
bool GetOutFileName(std::string& outFileName);
std::string GetPath();
std::string GetPluginName();
bool IsRunning();
bool IsLoaded();
bool BindFunctions();
@ -58,6 +64,7 @@ public:
bool StartSession(const uint8_t* buffer, uint32_t size);
bool StopSession();
int32_t ReportResult(uint8_t* buffer, uint32_t size);
bool ReportBasicData();
bool RegisterWriter(const BufferWriterPtr writer);
WriterPtr GetWriter();

View File

@ -110,6 +110,11 @@ public:
{
this->commandPoller_ = p;
}
virtual bool ReportPluginBasicData(const std::vector<uint32_t>& pluginIds)
{
return true;
}
private:
CommandPollerPtr commandPoller_;
};

View File

@ -45,6 +45,7 @@ public:
int StartCapture(void);
int StopCapture(void);
bool ParseBasicData(void);
private:
DISALLOW_COPY_AND_MOVE(FlowController);

View File

@ -61,6 +61,7 @@ private:
WriterStructPtr writer_ = nullptr;
std::vector<char> buffer_;
std::mutex mutex_;
};
FTRACE_NS_END
#endif // RESULT_TRANSPORTER_H

View File

@ -183,14 +183,8 @@ bool FlowController::CreateRawDataCaches()
return true;
}
int FlowController::StartCapture(void)
bool FlowController::ParseBasicData()
{
CHECK_TRUE(ftraceSupported_, -1, "current kernel not support ftrace!");
CHECK_NOTNULL(ftraceParser_, -1, "create FtraceParser FAILED!");
CHECK_NOTNULL(ksymsParser_, -1, "create KernelSymbolsParser FAILED!");
CHECK_NOTNULL(tansporter_, -1, "crated ResultTransporter FAILED!");
CHECK_NOTNULL(traceOps_, -1, "create TraceOps FAILED!");
// get clock times
if (getClockTimes_) {
CHECK_TRUE(ReportClockTimes(), -1, "report clock times FAILED!");
@ -204,6 +198,19 @@ int FlowController::StartCapture(void)
// parse per cpu stats
CHECK_TRUE(ParsePerCpuStatus(TRACE_START), -1, "parse TRACE_START stats failed!");
return 0;
}
int FlowController::StartCapture(void)
{
CHECK_TRUE(ftraceSupported_, -1, "current kernel not support ftrace!");
CHECK_NOTNULL(ftraceParser_, -1, "create FtraceParser FAILED!");
CHECK_NOTNULL(ksymsParser_, -1, "create KernelSymbolsParser FAILED!");
CHECK_NOTNULL(tansporter_, -1, "crated ResultTransporter FAILED!");
CHECK_NOTNULL(traceOps_, -1, "create TraceOps FAILED!");
CHECK_TRUE(ParseBasicData() == 0, -1, "parse basic data failed!");
// create memory pool, and raw data readers, buffers, caches.
CHECK_TRUE(CreatePagedMemoryPool(), -1, "create paged memory pool failed!");
CHECK_TRUE(CreateRawDataReaders(), -1, "create raw data readers failed!");

View File

@ -54,6 +54,17 @@ int TracePluginStartSession(const uint8_t configData[], const uint32_t configSiz
return result;
}
int TracePluginReportBasicData()
{
std::unique_lock<std::mutex> lock(g_mutex);
CHECK_NOTNULL(g_mainController, -1, "no FlowController created!");
HILOG_INFO(LOG_CORE, "%s: %d", __func__, __LINE__);
int result = g_mainController->ParseBasicData();
HILOG_INFO(LOG_CORE, "%s: %d", __func__, __LINE__);
return result;
}
int TracePluginStopSession()
{
std::unique_lock<std::mutex> lock(g_mutex);
@ -72,6 +83,7 @@ static PluginModuleCallbacks moduleCallbacks = {
.onPluginReportResult = 0,
.onPluginSessionStop = TracePluginStopSession,
.onRegisterWriterStruct = TracePluginRegisterWriter,
.onReportBasicDataCallback = TracePluginReportBasicData, // report ftrace_plugin basic data
};
PluginModuleStruct g_pluginModule = {

View File

@ -101,14 +101,20 @@ void ResultTransporter::Flush()
bool ResultTransporter::Submit(ResultPtr&& packet)
{
std::unique_lock<std::mutex> lock(mutex_);
long nbytes = Write(std::move(packet));
CHECK_TRUE(nbytes >= 0, false, "send result FAILED!");
if (nbytes < 0) {
HILOG_ERROR(LOG_CORE, "send result FAILED!");
lock.unlock();
return false;
}
bytesCount_ += nbytes;
bytesPending_ += nbytes;
if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
Flush();
}
lock.unlock();
return true;
}
FTRACE_NS_END

View File

@ -100,6 +100,7 @@ int HilogPlugin::Start(const uint8_t* configData, uint32_t configSize)
g_id = 1;
std::unique_lock<std::mutex> locker(mutex_);
running_ = true;
locker.unlock();
workThread_ = std::thread(&HilogPlugin::Run, this);
return 0;

View File

@ -59,6 +59,12 @@ bool ParseConfigToCmd(const HiperfPluginConfig& config, std::vector<std::string>
prepareCmd += HIPERF_CMD + g_logLevel + HIPERF_RECORD_CMD + HIPERF_RECORD_PREPARE;
if (!config.outfile_name().empty()) {
prepareCmd += " -o " + config.outfile_name();
size_t fileSize = sizeof(g_pluginModule.outFileName);
int ret = strncpy_s(g_pluginModule.outFileName, fileSize, config.outfile_name().c_str(), fileSize - 1);
if (ret != EOK) {
HILOG_ERROR(LOG_CORE, "strncpy_s error! outfile is %s", config.outfile_name().c_str());
return false;
}
}
if (!config.record_args().empty()) {
prepareCmd += " " + config.record_args();
@ -136,4 +142,4 @@ static PluginModuleCallbacks g_callbacks = {
HiperfRegisterWriterStruct,
};
PluginModuleStruct g_pluginModule = {&g_callbacks, "hiperf-plugin", MAX_BUFFER_SIZE};
PluginModuleStruct g_pluginModule = {&g_callbacks, "hiperf-plugin", MAX_BUFFER_SIZE, true, "/data/local/tmp/perf.data"};

View File

@ -62,6 +62,7 @@ public:
bool StartPluginSession(const std::vector<uint32_t>& pluginIds,
const std::vector<ProfilerPluginConfig>& config) override;
bool StopPluginSession(const std::vector<uint32_t>& pluginIds) override;
bool ReportPluginBasicData(const std::vector<uint32_t>& pluginIds) override;
bool CreateWriter(std::string pluginName, uint32_t bufferSize, int smbFd, int eventFd) override;
bool ResetWriter(uint32_t pluginId) override;

View File

@ -357,6 +357,11 @@ bool HookManager::StopPluginSession(const std::vector<uint32_t>& pluginIds)
return true;
}
bool HookManager::ReportPluginBasicData(const std::vector<uint32_t>& pluginIds)
{
return true;
}
bool HookManager::CreateWriter(std::string pluginName, uint32_t bufferSize, int smbFd, int eventFd)
{
HILOG_DEBUG(LOG_CORE, "agentIndex_ %d", agentIndex_);

View File

@ -36,6 +36,7 @@ public:
long WriteTimeout(const void* data, size_t size);
long WriteWithPayloadTimeout(const void* data, size_t size, const void* payload, size_t payloadSize);
bool Flush() override;
bool Clear() override;
private:
void DoStats(long bytes);

View File

@ -88,3 +88,12 @@ bool StackWriter::Flush()
bytesPending_ = 0;
return true;
}
bool StackWriter::Clear()
{
if (shareMemoryBlock_ == nullptr) {
return false;
}
shareMemoryBlock_->ClearShareMemoryBlock();
return true;
}

View File

@ -48,6 +48,8 @@ struct PluginInfo {
std::string path;
std::string sha256;
uint32_t bufferSizeHint;
bool isStandaloneFileData = false;
std::string outFileName = "";
SocketContext* context = nullptr;
};
@ -56,6 +58,8 @@ struct PluginContext {
std::string path;
std::string sha256;
uint32_t bufferSizeHint;
bool isStandaloneFileData = false;
std::string outFileName = "";
SocketContext* context = nullptr;
ProfilerPluginConfig config;
ProfilerDataRepeaterPtr profilerDataRepeater;
@ -80,6 +84,7 @@ public:
bool StartPluginSession(const ProfilerPluginConfig& config);
bool StopPluginSession(const std::string& pluginName);
bool DestroyPluginSession(const std::string& pluginName);
bool RefreshPluginSession(const std::string& pluginName);
bool AddPluginInfo(const PluginInfo& pluginInfo);
bool GetPluginInfo(const std::string& pluginName, PluginInfo& pluginInfo);
@ -93,13 +98,13 @@ public:
void SetPluginSessionManager(const PluginSessionManagerPtr& pluginSessionManager);
void SetProfilerSessionConfig(const ProfilerSessionConfig& profilerSessionConfig);
std::pair<uint32_t, PluginContextPtr> GetPluginContext(const std::string& pluginName);
private:
bool StartService(const std::string& unixSocketName);
SemaphorePtr GetSemaphore(uint32_t) const;
void ReadShareMemory(PluginContext&);
std::pair<uint32_t, PluginContextPtr> GetPluginContext(const std::string& pluginName);
PluginContextPtr GetPluginContextById(uint32_t id);
bool RemovePluginSessionCtx(const std::string& pluginName);

View File

@ -41,6 +41,7 @@ public:
bool Start();
bool Stop();
bool Refresh();
bool IsAvailable() const;

View File

@ -40,6 +40,7 @@ public:
bool StartPluginSessions(const std::vector<std::string>& nameList);
bool StopPluginSessions(const std::vector<std::string>& nameList);
bool RefreshPluginSession();
bool CheckStatus(const std::vector<std::string>& nameList, PluginSession::State state);
std::vector<PluginSession::State> GetStatus(const std::vector<std::string>& nameList);
@ -59,6 +60,7 @@ private:
// plugin session data:
std::map<std::string, PluginSessionPtr> pluginSessions_;
std::vector<std::string> pluginNameList_;
};
#endif // PLUGIN_SESSION_MANAGER_H

View File

@ -88,6 +88,21 @@ GetCommandResponsePtr PluginCommandBuilder::BuildStopSessionCmd(uint32_t pluginI
return cmd;
}
GetCommandResponsePtr PluginCommandBuilder::BuildRefreshSessionCmd(uint32_t pluginId)
{
auto cmd = std::make_shared<GetCommandResponse>();
cmd->set_status(ResponseStatus::OK);
cmd->set_has_more(false);
cmd->set_command_id(cmdIdAutoIncrease_);
RefreshSessionCmd* rsc = cmd->mutable_refresh_session_cmd();
rsc->add_plugin_ids(pluginId);
commandHistory_[cmdIdAutoIncrease_] = cmd;
cmdIdAutoIncrease_++;
return cmd;
}
bool PluginCommandBuilder::GetedCommandResponse(uint32_t cmdId)
{
if (commandHistory_.find(cmdId) == commandHistory_.end()) {

View File

@ -32,6 +32,7 @@ public:
GetCommandResponsePtr BuildDestroySessionCmd(uint32_t pluginId);
GetCommandResponsePtr BuildStartSessionCmd(const ProfilerPluginConfig& config, uint32_t pluginId);
GetCommandResponsePtr BuildStopSessionCmd(uint32_t pluginId);
GetCommandResponsePtr BuildRefreshSessionCmd(uint32_t pluginId);
bool GetedCommandResponse(uint32_t cmdId);

View File

@ -236,6 +236,21 @@ bool PluginService::DestroyPluginSession(const std::string& pluginName)
return true;
}
bool PluginService::RefreshPluginSession(const std::string& pluginName)
{
uint32_t pluginId = 0;
PluginContextPtr pluginCtx = nullptr;
std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName);
CHECK_NOTNULL(pluginCtx, false, "get PluginContext failed!");
auto cmd = pluginCommandBuilder_->BuildRefreshSessionCmd(pluginId);
CHECK_TRUE(cmd != nullptr, false, "RefreshPluginSession BuildRefreshSessionCmd FAIL %s", pluginName.c_str());
pluginServiceImpl_->PushCommand(*pluginCtx->context, cmd);
HILOG_INFO(LOG_CORE, "RefreshPluginSession %s done!", pluginName.c_str());
return true;
}
bool PluginService::RemovePluginSessionCtx(const std::string& pluginName)
{
PluginContextPtr pluginCtx = GetPluginContext(pluginName).second;
@ -295,6 +310,8 @@ bool PluginService::AddPluginInfo(const PluginInfo& pluginInfo)
pluginCtx->profilerPluginState.set_state(ProfilerPluginState::REGISTERED);
pluginCtx->sha256 = pluginInfo.sha256;
pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint;
pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData;
pluginCtx->outFileName = pluginInfo.outFileName;
uint32_t pluginId = ++pluginIdCounter_;
std::unique_lock<std::mutex> lock(mutex_);
@ -314,6 +331,12 @@ bool PluginService::AddPluginInfo(const PluginInfo& pluginInfo)
if (pluginInfo.bufferSizeHint != 0) {
pluginCtx->bufferSizeHint = pluginInfo.bufferSizeHint;
}
if (pluginInfo.isStandaloneFileData != false) {
pluginCtx->isStandaloneFileData = pluginInfo.isStandaloneFileData;
}
if (pluginInfo.outFileName != "") {
pluginCtx->outFileName = pluginInfo.outFileName;
}
}
HILOG_DEBUG(LOG_CORE, "AddPluginInfo for %s done!", pluginInfo.name.c_str());

View File

@ -33,9 +33,16 @@ bool PluginServiceImpl::RegisterPlugin(SocketContext& context,
pluginInfo.path = request.path();
pluginInfo.sha256 = request.sha256();
pluginInfo.bufferSizeHint = request.buffer_size_hint();
pluginInfo.isStandaloneFileData = request.is_standalone_data();
pluginInfo.outFileName = request.out_file_name();
pluginInfo.context = &context;
int pluginId = pluginService->GetPluginIdByName(pluginInfo.name);
if (pluginService->AddPluginInfo(pluginInfo)) {
if (pluginId != 0) { // update plugin not need reply response
HILOG_DEBUG(LOG_CORE, "UpdatePlugin OK");
return false;
}
response.set_status(ResponseStatus::OK);
response.set_plugin_id(pluginService->GetPluginIdByName(pluginInfo.name));
HILOG_DEBUG(LOG_CORE, "RegisterPlugin OK");

View File

@ -132,6 +132,22 @@ bool PluginSession::Start()
return retval;
}
bool PluginSession::Refresh()
{
std::unique_lock<std::mutex> lock(mutex_);
HILOG_INFO(LOG_CORE, "Refresh for %s...", pluginConfig_.name().c_str());
CHECK_TRUE(state_ == STARTED, false, "plugin state %d invalid!", state_);
auto pluginService = pluginService_.lock();
CHECK_NOTNULL(pluginService, false, "PluginSession::%s pluginService promote failed!", __func__);
bool retval = pluginService->RefreshPluginSession(pluginConfig_.name());
HILOG_INFO(LOG_CORE, "RefreshPluginSession for %s %s!", pluginConfig_.name().c_str(), retval ? "OK" : "FAIL");
CHECK_TRUE(retval, false, "call PluginService::RefreshPluginSession failed!");
return retval;
}
bool PluginSession::Stop()
{
std::unique_lock<std::mutex> lock(mutex_);

View File

@ -152,6 +152,7 @@ bool PluginSessionManager::InvalidatePluginSessions(const std::vector<std::strin
bool PluginSessionManager::StartPluginSessions(const std::vector<std::string>& nameList)
{
CHECK_TRUE(nameList.size() > 0, false, "nameList empty!");
pluginNameList_ = std::move(nameList);
// start each plugin sessions
size_t failureCount = 0;
std::unique_lock<std::mutex> lock(mutex_);
@ -186,6 +187,23 @@ bool PluginSessionManager::StopPluginSessions(const std::vector<std::string>& na
return failureCount == 0;
}
bool PluginSessionManager::RefreshPluginSession()
{
size_t failureCount = 0;
std::unique_lock<std::mutex> lock(mutex_);
for (auto& name : pluginNameList_) {
auto it = pluginSessions_.find(name);
if (it == pluginSessions_.end()) {
continue;
}
if (!it->second->Refresh()) {
HILOG_INFO(LOG_CORE, "refresh data %s FAILED!", it->first.c_str());
failureCount++;
}
}
return failureCount == 0;
}
bool PluginSessionManager::CheckStatus(const std::vector<std::string>& nameList, PluginSession::State state)
{
std::unique_lock<std::mutex> lock(mutex_);

View File

@ -23,6 +23,7 @@ config("profiler_service_config") {
"src",
"${OHOS_PROFILER_DIR}/device/services/ipc/include",
"${OHOS_PROFILER_DIR}/device/base/include",
"${OHOS_PROFILER_DIR}/interfaces/kits",
]
if (current_toolchain != host_toolchain) {
cflags = [ "-DHAVE_HILOG" ]

View File

@ -42,6 +42,8 @@ public:
size_t Size();
void ClearQueue();
private:
std::mutex mutex_;
std::condition_variable slotCondVar_;

View File

@ -123,7 +123,8 @@ private:
bool RemoveSessionContext(uint32_t sessionId);
void MergeHiperfFile(const SessionContextPtr& sessionCtx);
void MergeStandaloneFile(const std::string& resultFile,
const std::string& pluginName, const std::string& outputFile);
private:
mutable std::mutex sessionContextMutex_ = {};

View File

@ -51,6 +51,12 @@ void ProfilerDataRepeater::Close()
bool ProfilerDataRepeater::PutPluginData(const ProfilerPluginDataPtr& pluginData)
{
std::unique_lock<std::mutex> lock(mutex_);
if ((pluginData == nullptr) && (dataQueue_.size() > 0)) {
HILOG_INFO(LOG_CORE, "no need put nullptr if queue has data, dataQueue_.size() = %zu", dataQueue_.size());
return true;
}
while (dataQueue_.size() >= maxSize_ && !closed_) {
slotCondVar_.wait(lock);
}
@ -105,3 +111,9 @@ int ProfilerDataRepeater::TakePluginData(std::vector<ProfilerPluginDataPtr>& plu
slotCondVar_.notify_one();
return count;
}
void ProfilerDataRepeater::ClearQueue()
{
std::unique_lock<std::mutex> lock(mutex_);
dataQueue_.clear();
}

View File

@ -26,6 +26,7 @@
#include "trace_file_writer.h"
using namespace ::grpc;
using PluginContextPtr = std::shared_ptr<PluginContext>;
#define CHECK_REQUEST_RESPONSE(context, request, response) \
do { \
@ -269,7 +270,7 @@ Status ProfilerService::CreateSession(ServerContext* context,
CHECK_POINTER_NOTNULL(dataRepeater, "alloc ProfilerDataRepeater failed!");
// create ResultDemuxer
auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater);
auto resultDemuxer = std::make_shared<ResultDemuxer>(dataRepeater, pluginSessionManager_);
CHECK_POINTER_NOTNULL(resultDemuxer, "alloc ResultDemuxer failed!");
// create TraceFileWriter for offline mode
@ -277,7 +278,8 @@ Status ProfilerService::CreateSession(ServerContext* context,
if (sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
auto resultFile = sessionConfig.result_file();
CHECK_EXPRESSION_TRUE(resultFile.size() > 0, "result_file empty!");
traceWriter = std::make_shared<TraceFileWriter>(resultFile);
traceWriter = std::make_shared<TraceFileWriter>(resultFile, sessionConfig.split_file(),
sessionConfig.single_file_max_size_mb());
CHECK_POINTER_NOTNULL(traceWriter, "alloc TraceFileWriter failed!");
resultDemuxer->SetTraceWriter(traceWriter);
for (int i = 0; i < nConfigs; i++) {
@ -290,6 +292,7 @@ Status ProfilerService::CreateSession(ServerContext* context,
}
traceWriter->SetPluginConfig(msgData.data(), msgData.size());
}
traceWriter->Flush();
}
// create session context
@ -370,60 +373,48 @@ bool ProfilerService::RemoveSessionContext(uint32_t sessionId)
return false;
}
void ProfilerService::MergeHiperfFile(const SessionContextPtr& sessionCtx)
void ProfilerService::MergeStandaloneFile(const std::string& resultFile,
const std::string& pluginName, const std::string& outputFile)
{
auto it = std::find(sessionCtx->pluginNames.begin(), sessionCtx->pluginNames.end(), "hiperf-plugin");
if (it == sessionCtx->pluginNames.end()) {
return;
} else if (sessionCtx->sessionConfig.session_mode() != ProfilerSessionConfig::OFFLINE) {
if (pluginName.empty() || outputFile.empty()) {
HILOG_ERROR(LOG_CORE, "pluginName(%s) didn't set output file(%s)", pluginName.c_str(), outputFile.c_str());
return;
}
// get hiperf plugin ouput file
std::string hiperfFile;
for (auto &config : sessionCtx->pluginConfigs) {
if (config.name() != "hiperf-plugin") {
continue;
}
HiperfPluginConfig hiperfConfig;
if (!hiperfConfig.ParseFromArray(config.config_data().data(), config.config_data().size())) {
HILOG_ERROR(LOG_CORE, "parse hiperf config failed");
return;
}
hiperfFile = hiperfConfig.outfile_name();
break;
}
if (hiperfFile.empty()) {
HILOG_ERROR(LOG_CORE, "didn't set hiperf output file");
std::ifstream fsFile {}; // read from output file
fsFile.open(outputFile, std::ios_base::in | std::ios_base::binary);
if (!fsFile.good()) {
HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", outputFile.c_str(), fsFile.rdstate());
return;
}
std::ifstream fsHiperf {}; // read from hiperf output file
fsHiperf.open(hiperfFile, std::ios_base::in | std::ios_base::binary);
if (!fsHiperf.good()) {
HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", hiperfFile.c_str(), fsHiperf.rdstate());
return;
}
std::string targetFile = sessionCtx->sessionConfig.result_file();
std::ofstream fsTarget {}; // write to profiler ouput file
fsTarget.open(targetFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
fsTarget.open(resultFile, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
if (!fsTarget.good()) {
HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", targetFile.c_str(), fsTarget.rdstate());
HILOG_ERROR(LOG_CORE, "open file(%s) failed: %d", resultFile.c_str(), fsTarget.rdstate());
return;
}
fsTarget.seekp(0, std::ios_base::end);
int posHiperf = fsTarget.tellp(); // for update sha256
int posFile = fsTarget.tellp(); // for update sha256
TraceFileHeader header {};
header.data_.dataType = DataType::HIPERF_DATA;
fsHiperf.seekg(0, std::ios_base::end);
uint64_t fileSize = (uint64_t)(fsHiperf.tellg());
if (pluginName == "hiperf-plugin") {
header.data_.dataType = DataType::HIPERF_DATA;
} else {
header.data_.dataType = DataType::STANDALONE_DATA;
}
fsFile.seekg(0, std::ios_base::end);
uint64_t fileSize = (uint64_t)(fsFile.tellg());
header.data_.length += fileSize;
size_t pluginSize = sizeof(header.data_.standalonePluginName);
int ret = strncpy_s(header.data_.standalonePluginName, pluginSize, pluginName.c_str(), pluginSize - 1);
if (ret != EOK) {
HILOG_ERROR(LOG_CORE, "strncpy_s error! pluginName is %s", pluginName.c_str());
return;
}
fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
if (!fsTarget.good()) {
HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", targetFile.c_str(), fsTarget.rdstate());
HILOG_ERROR(LOG_CORE, "write file(%s) header failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
return;
}
@ -432,12 +423,12 @@ void ProfilerService::MergeHiperfFile(const SessionContextPtr& sessionCtx)
constexpr uint64_t bufSize = 4 * 1024 * 1024;
std::vector<char> buf(bufSize);
uint64_t readSize = 0;
fsHiperf.seekg(0);
fsFile.seekg(0);
while ((readSize = std::min(bufSize, fileSize)) > 0) {
fsHiperf.read(buf.data(), readSize);
fsFile.read(buf.data(), readSize);
fsTarget.write(buf.data(), readSize);
if (!fsTarget.good()) {
HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", targetFile.c_str(), fsTarget.rdstate());
HILOG_ERROR(LOG_CORE, "write file(%s) failed: %d\n", resultFile.c_str(), fsTarget.rdstate());
return;
}
fileSize -= readSize;
@ -445,13 +436,13 @@ void ProfilerService::MergeHiperfFile(const SessionContextPtr& sessionCtx)
SHA256_Update(&sha256Ctx, buf.data(), readSize);
}
SHA256_Final(header.data_.sha256, &sha256Ctx);
fsTarget.seekp(posHiperf, std::ios_base::beg);
fsTarget.seekp(posFile, std::ios_base::beg);
fsTarget.write(reinterpret_cast<char*>(&header), sizeof(header));
fsHiperf.close();
fsFile.close();
fsTarget.close();
HILOG_INFO(LOG_CORE, "write hiperf file done");
HILOG_INFO(LOG_CORE, "write standalone(%s) to result(%s) done", outputFile.c_str(), resultFile.c_str());
}
Status ProfilerService::StartSession(ServerContext* context,
@ -577,7 +568,10 @@ Status ProfilerService::StopSession(ServerContext* context,
auto ctx = GetSessionContext(sessionId);
CHECK_POINTER_NOTNULL(ctx, "session_id invalid!");
if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
CHECK_POINTER_NOTNULL(ctx->traceFileWriter, "traceFileWriter invalid!");
ctx->traceFileWriter.get()->SetStopSplitFile(true);
}
CHECK_EXPRESSION_TRUE(ctx->StopPluginSessions(), "stop plugin sessions failed!");
HILOG_INFO(LOG_CORE, "StopSession %d %u done!", request->request_id(), sessionId);
return Status::OK;
@ -601,7 +595,21 @@ Status ProfilerService::DestroySession(ServerContext* context,
"remove plugin session FAILED!");
HILOG_INFO(LOG_CORE, "DestroySession %d %u done!", request->request_id(), sessionId);
MergeHiperfFile(ctx);
if (ctx->sessionConfig.session_mode() == ProfilerSessionConfig::OFFLINE) {
uint32_t pluginId = 0;
PluginContextPtr pluginCtx = nullptr;
for (size_t i = 0; i < ctx->pluginNames.size(); i++) {
auto pluginName = ctx->pluginNames[i];
std::tie(pluginId, pluginCtx) = pluginService_->GetPluginContext(pluginName);
if (pluginCtx->isStandaloneFileData == true) {
std::string file = ctx->sessionConfig.result_file();
if (ctx->sessionConfig.split_file() && ctx->sessionConfig.single_file_max_size_mb() > 0) {
file = ctx->traceFileWriter.get()->Path();
}
MergeStandaloneFile(file, pluginName, pluginCtx->outFileName);
}
}
}
return Status::OK;
}

View File

@ -15,15 +15,18 @@
#include "result_demuxer.h"
#include <unistd.h>
#include "logging.h"
#include "trace_file_header.h"
namespace {
constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
} // namespace
ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater)
ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater, PluginSessionManagerPtr pluginSessionManager)
: dataRepeater_(dataRepeater), flushInterval_(DEFAULT_FLUSH_INTERVAL)
{
pluginSessionManager_ = pluginSessionManager;
}
ResultDemuxer::~ResultDemuxer()
@ -74,7 +77,7 @@ bool ResultDemuxer::StopTakeResults()
void ResultDemuxer::TakeResults()
{
if (!dataRepeater_) {
if (!dataRepeater_ || !pluginSessionManager_) {
return;
}
@ -87,7 +90,12 @@ void ResultDemuxer::TakeResults()
}
if (traceWriter_) {
traceWriter_->Write(*pluginData);
int ret = traceWriter_->Write(*pluginData);
if (ret == -1) {
HILOG_DEBUG(LOG_CORE, "need to clear queue and report the basic data");
dataRepeater_->ClearQueue();
pluginSessionManager_->RefreshPluginSession();
}
auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - lastFlushTime_);
if (elapsedTime >= flushInterval_) {
@ -99,6 +107,5 @@ void ResultDemuxer::TakeResults()
}
}
traceWriter_->Finish();
traceWriter_->Flush();
HILOG_INFO(LOG_CORE, "TakeResults thread %d, exit!", gettid());
}

View File

@ -20,13 +20,16 @@
#include "logging.h"
#include "nocopyable.h"
#include "plugin_session_manager.h"
#include "profiler_data_repeater.h"
#include "profiler_service.grpc.pb.h"
#include "trace_file_writer.h"
using PluginSessionManagerPtr = STD_PTR(shared, PluginSessionManager);
class ResultDemuxer {
public:
explicit ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater);
explicit ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater, PluginSessionManagerPtr pluginSessionManager);
~ResultDemuxer();
@ -48,6 +51,7 @@ private:
std::chrono::steady_clock::time_point lastFlushTime_ {};
std::thread demuxerThread_ {};
bool isStopTakeData_ = false;
PluginSessionManagerPtr pluginSessionManager_ = nullptr;
DISALLOW_COPY_AND_MOVE(ResultDemuxer);
};

View File

@ -15,9 +15,12 @@
#ifndef TRACE_FILE_HEADER_H
#define TRACE_FILE_HEADER_H
#include "plugin_module_api.h"
enum DataType {
HIPROFILER_PROTOBUF_BIN = 0,
HIPERF_DATA,
STANDALONE_DATA = 1000,
};
struct TraceFileHeader {
@ -42,8 +45,10 @@ struct TraceFileHeader {
uint64_t monotonic = 0;
uint64_t monotonicCoarse = 0;
uint64_t monotonicRaw = 0;
char standalonePluginName[PLUGIN_MODULE_NAME_MAX + 1] = "";
} __attribute__((packed));
HeaderData data_ = {};
static_assert(sizeof(data_) < HEADER_SIZE, "The max length of the reserved header is 1024 bytes");
uint8_t padding_[HEADER_SIZE - sizeof(data_)] = {};
};

View File

@ -22,9 +22,22 @@
using CharPtr = std::unique_ptr<char>::pointer;
using ConstCharPtr = std::unique_ptr<const char>::pointer;
TraceFileWriter::TraceFileWriter(const std::string& path) : path_(path), writeBytes_(0)
namespace {
const int MB_TO_BYTE = 1024 * 1024;
const int MIN_BYTE = 200;
} // namespace
TraceFileWriter::TraceFileWriter(const std::string& path, bool splitFile, uint32_t SingleFileMaxSizeMb)
: path_(path), writeBytes_(0)
{
Open(path);
isSplitFile_ = splitFile;
SingleFileMaxSize_ = (SingleFileMaxSizeMb < MIN_BYTE) ? (MIN_BYTE * MB_TO_BYTE) :
(SingleFileMaxSizeMb * MB_TO_BYTE);
oldPath_ = path;
fileNum_ = 1;
WriteHeader();
Flush();
}
TraceFileWriter::~TraceFileWriter()
@ -42,6 +55,13 @@ std::string TraceFileWriter::Path() const
bool TraceFileWriter::SetPluginConfig(const void* data, size_t size)
{
if (isSplitFile_) {
std::vector<char> configVec;
auto configData = reinterpret_cast<ConstCharPtr>(data);
configVec.insert(configVec.end(), configData, configData + size);
pluginConfigsData_.push_back(std::move(configVec));
}
Write(data, size);
return true;
}
@ -70,15 +90,37 @@ void TraceFileWriter::SetTimeStamp()
static_cast<uint64_t>(ts.tv_nsec);
}
bool TraceFileWriter::Open(const std::string& path)
static std::string GetCurrentTime()
{
stream_.open(path, std::ios_base::out | std::ios_base::binary);
CHECK_TRUE(stream_.is_open(), false, "open %s failed, %d!", path.c_str(), errno);
const int usMs = 1000;
struct timeval tv;
gettimeofday(&tv, NULL);
return std::to_string(tv.tv_sec * usMs + tv.tv_usec / usMs);
}
bool TraceFileWriter::WriteHeader()
{
if (isSplitFile_) {
std::string timeStr = GetCurrentTime();
int pos = (int)(oldPath_.find_last_of('.'));
if (pos != 0) {
path_ = oldPath_.substr(0, pos) + "_" + timeStr + "_" + std::to_string(fileNum_) +
oldPath_.substr(pos, oldPath_.size());
} else {
path_ = oldPath_ + "_" + timeStr + "_" + std::to_string(fileNum_);
}
}
stream_.open(path_, std::ios_base::out | std::ios_base::binary);
CHECK_TRUE(stream_.is_open(), false, "open %s failed, %d!", path_.c_str(), errno);
// write initial header, makes file write position move forward
helper_ = {};
header_ = {};
stream_.write(reinterpret_cast<CharPtr>(&header_), sizeof(header_));
CHECK_TRUE(stream_, false, "write initial header to %s failed!", path_.c_str());
path_ = path;
dataSize_ = header_.HEADER_SIZE;
HILOG_INFO(LOG_CORE, "write file(%s) header end", path_.c_str());
return true;
}
@ -105,10 +147,43 @@ long TraceFileWriter::Write(const void* data, size_t size)
return nbytes;
}
bool TraceFileWriter::IsSplitFile(uint32_t size)
{
dataSize_ += sizeof(uint32_t) + size;
if (dataSize_ >= SingleFileMaxSize_) {
HILOG_INFO(LOG_CORE, "need to split the file(%s), data size:%d, size: %d, SingleFileMaxSize_:%d",
path_.c_str(), dataSize_, size, SingleFileMaxSize_);
// update old file header
Finish();
if (stream_.is_open()) {
stream_.close();
}
fileNum_++;
// write header of the new file
WriteHeader();
// write the plugin config of the new file
for (size_t i = 0; i < pluginConfigsData_.size(); i++) {
Write(pluginConfigsData_[i].data(), pluginConfigsData_[i].size());
}
Flush();
return true;
}
return false;
}
long TraceFileWriter::Write(const MessageLite& message)
{
auto size = message.ByteSizeLong();
if (isSplitFile_ && !isStop_) {
if (IsSplitFile(size)) {
return -1;
}
}
// serialize message to bytes array
std::vector<char> msgData(message.ByteSizeLong());
std::vector<char> msgData(size);
CHECK_TRUE(message.SerializeToArray(msgData.data(), msgData.size()), 0, "SerializeToArray failed!");
return Write(msgData.data(), msgData.size());
@ -120,13 +195,16 @@ bool TraceFileWriter::Finish()
helper_.Update(header_);
// move write position to begin of file
CHECK_TRUE(stream_.is_open(), false, "binary file %s not open or open failed!", path_.c_str());
stream_.seekp(0);
CHECK_TRUE(stream_, false, "seek write position to head for %s failed!", path_.c_str());
SetTimeStamp(); // add timestamp in header
// write final header
stream_.write(reinterpret_cast<CharPtr>(&header_), sizeof(header_));
CHECK_TRUE(stream_, false, "write final header to %s failed!", path_.c_str());
CHECK_TRUE(stream_.flush(), false, "binary file %s flush failed!", path_.c_str());
return true;
}
@ -137,3 +215,8 @@ bool TraceFileWriter::Flush()
HILOG_INFO(LOG_CORE, "flush: %s, bytes: %" PRIu64 ", count: %" PRIu64, path_.c_str(), writeBytes_, writeCount_);
return true;
}
void TraceFileWriter::SetStopSplitFile(bool isStop)
{
isStop_ = isStop;
}

View File

@ -18,6 +18,7 @@
#include <cstdint>
#include <fstream>
#include <google/protobuf/message_lite.h>
#include <mutex>
#include <string>
#include "logging.h"
@ -29,7 +30,7 @@ using google::protobuf::MessageLite;
class TraceFileWriter : public Writer {
public:
explicit TraceFileWriter(const std::string& path);
explicit TraceFileWriter(const std::string& path, bool splitFile = false, uint32_t SingleFileMaxSizeMb = 0);
~TraceFileWriter();
@ -37,7 +38,7 @@ public:
bool SetPluginConfig(const void* data, size_t size);
bool Open(const std::string& path);
bool WriteHeader();
long Write(const MessageLite& message);
@ -47,16 +48,27 @@ public:
bool Finish();
bool IsSplitFile(uint32_t size);
void SetStopSplitFile(bool isStop);
private:
void SetTimeStamp();
private:
std::string path_ {};
std::string oldPath_ {};
std::ofstream stream_ {};
uint64_t writeBytes_ = 0;
uint64_t writeCount_ = 0;
TraceFileHeader header_ {};
TraceFileHelper helper_ {};
uint32_t dataSize_;
bool isSplitFile_ = false;
uint32_t SingleFileMaxSize_;
std::vector<std::vector<char>> pluginConfigsData_;
bool isStop_ = false;
int fileNum_;
DISALLOW_COPY_AND_MOVE(TraceFileWriter);
};

View File

@ -197,6 +197,15 @@ bool PluginService::DestroyPluginSession(const std::string& pluginName)
return true;
}
bool PluginService::RefreshPluginSession(const std::string& pluginName)
{
uint32_t pluginId = 0;
PluginContextPtr pluginCtx = nullptr;
std::tie(pluginId, pluginCtx) = GetPluginContext(pluginName);
HILOG_INFO(LOG_CORE, "RefreshPluginSession %s done!", pluginName.c_str());
return true;
}
std::pair<uint32_t, PluginContextPtr> PluginService::GetPluginContext(const std::string& pluginName)
{
std::unique_lock<std::mutex> lock(mutex_);

View File

@ -28,6 +28,7 @@ class ResultDemuxerTest : public ::testing::Test {
protected:
std::string path = "demux.bin";
ProfilerDataRepeaterPtr repeater;
PluginSessionManagerPtr pluginSessionManager;
static void SetUpTestCase() {}
static void TearDownTestCase() {}
@ -35,6 +36,7 @@ protected:
void SetUp() override
{
repeater = std::make_shared<ProfilerDataRepeater>(DATA_MAX_SIZE);
pluginSessionManager = std::make_shared<PluginSessionManager>(std::make_shared<PluginService>());
}
void TearDown() override {}
@ -47,7 +49,7 @@ protected:
*/
HWTEST_F(ResultDemuxerTest, CtorDtor, TestSize.Level1)
{
auto demuxer = std::make_shared<ResultDemuxer>(repeater);
auto demuxer = std::make_shared<ResultDemuxer>(repeater, pluginSessionManager);
EXPECT_NE(demuxer, nullptr);
}
@ -58,7 +60,7 @@ HWTEST_F(ResultDemuxerTest, CtorDtor, TestSize.Level1)
*/
HWTEST_F(ResultDemuxerTest, SetTraceWriter, TestSize.Level1)
{
auto demuxer = std::make_shared<ResultDemuxer>(repeater);
auto demuxer = std::make_shared<ResultDemuxer>(repeater, pluginSessionManager);
ASSERT_NE(demuxer, nullptr);
demuxer->SetTraceWriter(nullptr);
@ -75,7 +77,7 @@ HWTEST_F(ResultDemuxerTest, SetTraceWriter, TestSize.Level1)
*/
HWTEST_F(ResultDemuxerTest, SetServerWriter, TestSize.Level1)
{
auto demuxer = std::make_shared<ResultDemuxer>(repeater);
auto demuxer = std::make_shared<ResultDemuxer>(repeater, pluginSessionManager);
ASSERT_NE(demuxer, nullptr);
}
@ -86,7 +88,7 @@ HWTEST_F(ResultDemuxerTest, SetServerWriter, TestSize.Level1)
*/
HWTEST_F(ResultDemuxerTest, StartTakeResults, TestSize.Level1)
{
auto demuxer = std::make_shared<ResultDemuxer>(repeater);
auto demuxer = std::make_shared<ResultDemuxer>(repeater, pluginSessionManager);
ASSERT_NE(demuxer, nullptr);
auto writer = std::make_shared<TraceFileWriter>(path);
@ -120,7 +122,7 @@ HWTEST_F(ResultDemuxerTest, StartTakeResults, TestSize.Level1)
*/
HWTEST_F(ResultDemuxerTest, StopTakeResults, TestSize.Level1)
{
auto demuxer = std::make_shared<ResultDemuxer>(repeater);
auto demuxer = std::make_shared<ResultDemuxer>(repeater, pluginSessionManager);
ASSERT_NE(demuxer, nullptr);
auto writer = std::make_shared<TraceFileWriter>(path);

View File

@ -41,6 +41,7 @@ public:
std::string GetName();
uint32_t GetSize();
int GetfileDescriptor();
void ClearShareMemoryBlock();
bool Valid() const;

View File

@ -393,3 +393,13 @@ int ShareMemoryBlock::GetfileDescriptor()
{
return fileDescriptor_;
}
void ShareMemoryBlock::ClearShareMemoryBlock()
{
// clear header infos
PthreadLocker locker(header_->info.mutex_);
header_->info.readOffset_ = 0;
header_->info.writeOffset_ = 0;
header_->info.bytesCount_ = 0;
header_->info.chunkCount_ = 0;
}

View File

@ -19,6 +19,7 @@
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <climits>
#ifdef __cplusplus
extern "C" {
@ -64,6 +65,13 @@ typedef int (*PluginReportResultCallback)(uint8_t* bufferData, uint32_t bufferSi
*/
typedef int (*PluginSessionStopCallback)(void);
/**
* @brief interface type of plugin session stop,
* Called when stopping plugin sessions
* @return Return 0 for success and -1 for failure.
*/
typedef int (*PluginReportBasicDataCallback)(void);
/**
* WriterStruct type forward declaration
*/
@ -136,6 +144,11 @@ struct PluginModuleCallbacks {
* flow reporting results of plugin management framework
*/
RegisterWriterStructCallback onRegisterWriterStruct;
/**
* report plugin basic data
*/
PluginReportBasicDataCallback onReportBasicDataCallback = 0;
};
/**
@ -164,6 +177,10 @@ struct PluginModuleStruct {
* module to call the data reporting interface to use the memory buffer byte number
*/
uint32_t resultBufferSizeHint;
bool isStandaloneFileData = false;
char outFileName[PATH_MAX + 1];
};
/**

View File

@ -31,6 +31,9 @@ message RegisterPluginRequest {
string name = 4;
uint32 buffer_size_hint = 5;
bool is_standalone_data = 6;
string out_file_name = 7;
}
message RegisterPluginResponse {
@ -71,6 +74,10 @@ message StopSessionCmd {
repeated uint32 plugin_ids = 1;
}
message RefreshSessionCmd {
repeated uint32 plugin_ids = 1;
}
message GetCommandResponse {
ResponseStatus status = 1;
bool has_more = 2;
@ -80,6 +87,7 @@ message GetCommandResponse {
DestroySessionCmd destroy_session_cmd = 11;
StartSessionCmd start_session_cmd = 12;
StopSessionCmd stop_session_cmd = 13;
RefreshSessionCmd refresh_session_cmd = 14;
}
}

View File

@ -56,6 +56,8 @@ message ProfilerSessionConfig {
uint32 sample_duration = 5; // for OFFLINE mode, sample duration in ms
uint32 keep_alive_time = 6; // if set to non-zero value, session will auto-destroyed after CreateSession in ms
bool discard_cache_data = 7; // if set true, session will stop immediately.(cache data will be lost)
bool split_file = 8; // if set true, will split result_file
uint32 single_file_max_size_mb = 9; // limit the maximum size of a single file
}
message CreateSessionRequest {