feat: bump wineventlog module to the latest version and added topic mechanism to transmit raw events data

This commit is contained in:
Dmitry Ng
2023-05-24 19:09:42 +03:00
parent d19f506558
commit 1e827cf5a0
13 changed files with 53 additions and 17 deletions
+2 -2
View File
@@ -66,8 +66,8 @@
"fields": [
"reason"
],
"last_module_update": "2022-12-22 00:00:00",
"last_update": "2022-12-22 00:00:00"
"last_module_update": "2023-05-24 00:00:00",
"last_update": "2023-05-24 00:00:00"
},
{
"group_id": "",
+43 -7
View File
@@ -12,6 +12,11 @@ local current_event_config = __config.get_current_event_config()
local module_info = __config.get_module_info()
local module_config = cjson.decode(__config.get_current_config())
local topic_name = "raw_events"
local topic_token = __imc.make_topic and __imc.make_topic(topic_name, __gid) or ""
local last_subscription_check = 0
local result_subscriptions_list = {}
-- event and action engines initialization
local action_engine = CActionEngine(
{},
@@ -64,6 +69,29 @@ local function push_event(event_name, event_data)
end
end
-- return bool
local function is_using_subscription()
local now = os.time(os.date("!*t"))
local get_subscriptions = __imc.get_subscriptions
if get_subscriptions ~= nil and now - last_subscription_check > 5 then
result_subscriptions_list = {}
for _, subs in ipairs(get_subscriptions(topic_token)) do
table.insert(result_subscriptions_list, subs)
end
local modules = {}
for module_name, token in pairs(receivers) do
if glue.indexof(token, result_subscriptions_list) ~= nil then
table.insert(modules, module_name)
end
end
for _, module_name in ipairs(modules) do
receivers[module_name] = nil
end
last_subscription_check = now
end
return #result_subscriptions_list ~= 0
end
-- return nil
local function update_receivers()
receivers = {}
@@ -72,8 +100,12 @@ local function update_receivers()
for irx, module_name in ipairs(mlist) do
local minfo = " | group_id: " .. __gid .. " | module: " .. module_name
local token = __imc.make_token(module_name, __gid)
__log.debugf("receiver[%s] | token: '%s' %s", irx, token, minfo)
receivers[module_name] = token
if glue.indexof(token, result_subscriptions_list) ~= nil then
__log.debugf("skip receiver[%s] | token: '%s' %s", irx, token, minfo)
else
__log.debugf("add receiver[%s] | token: '%s' %s", irx, token, minfo)
receivers[module_name] = token
end
end
end
@@ -107,6 +139,10 @@ local function send_to_receivers(events)
__log.errorf("failed to serialize events message: %s", tostring(events))
return
end
local use_subscriptions = is_using_subscription()
if use_subscriptions then
__api.send_data_to(topic_token, msg)
end
for _, imc_token in pairs(receivers) do
__api.send_data_to(imc_token, msg)
end
@@ -114,7 +150,7 @@ end
-- return nil
local function resend()
for _=1,#messages_queue do
for _ = 1, #messages_queue do
if send(messages_queue[1]) then
table.remove(messages_queue, 1)
else
@@ -140,13 +176,13 @@ local function send_log(msgs)
if #messages_queue >= queue_size + 100 then
__log.error("drop message from queue because size limit exceeded")
local cnt = 0
for _=1,100 do
for _ = 1, 100 do
cnt = cnt + messages_queue[1].count
table.remove(messages_queue, 1)
end
__metric.add_int_counter("wel_agent_events_drop", cnt)
collectgarbage("collect")
__metric.add_int_gauge_counter("wel_agent_mem_usage", collectgarbage("count")*1024)
__metric.add_int_gauge_counter("wel_agent_mem_usage", collectgarbage("count") * 1024)
end
table.insert(messages_queue, message)
end
@@ -328,7 +364,7 @@ push_event("wel_module_started", {reason = "regular start"})
update_receivers()
while not handler() do
__metric.add_int_gauge_counter("wel_agent_mem_usage", collectgarbage("count")*1024)
__metric.add_int_gauge_counter("wel_agent_mem_usage", collectgarbage("count") * 1024)
__api.await(300)
end
@@ -345,7 +381,7 @@ q_out:free()
-- all events mark as drop
do
local cnt = 0
for i=1,#messages_queue do
for i = 1, #messages_queue do
cnt = cnt + messages_queue[i].count
end
if cnt ~= 0 then
+6 -6
View File
@@ -37,13 +37,13 @@ ffi.cdef [[
typedef void(*Module__Pause_Ptr)(Module_I* module, const Module__ErrorHandler* eh);
typedef void(*Module__Resume_Ptr)(Module_I* module, const Module__ErrorHandler* eh);
]]
if ffi.arch == 'x86' then
if ffi.arch == "x86" then
ffi.cdef [[
// Параметр info оригинальной сигнатуры раскладывается на стек по элементно
typedef bool(*Module__OnResult_Ptr)(Module_I* module, const char* jobId, int type, int format, int encoding, const void* data, size_t size, const Module__ErrorHandler* eh);
]]
end
if ffi.arch == 'x64' then
if ffi.arch == "x64" then
ffi.cdef [[
// Параметр info оригинальной сигнатуры передаётся через указатель, чтобы не кастовать каждый элемент в отдельности, заранее определяем его как указатель на int
typedef bool(*Module__OnResult_Ptr)(Module_I* module, const char* jobId, int info, const void* data, size_t size, const Module__ErrorHandler* eh);
@@ -84,13 +84,13 @@ ffi.cdef [[
typedef void(*ModuleTransport__SendKeepAlive_Ptr)(ModuleTransport_I* transport, const Module__ErrorHandler* eh);
typedef void(*ModuleTransport__SendProgress_Ptr)(ModuleTransport_I* transport, uint32_t progress, const Module__ErrorHandler* eh);
]]
if ffi.arch == 'x86' then
if ffi.arch == "x86" then
ffi.cdef [[
// Параметр info оригинальной сигнатуры раскладывается на стек по элементно
typedef void(*ModuleTransport__SendResult_Ptr)(ModuleTransport_I* transport, int type, int format, int encoding, const void* data, size_t size, const Module__ErrorHandler* eh);
]]
end
if ffi.arch == 'x64' then
if ffi.arch == "x64" then
ffi.cdef [[
// Параметр info оригинальной сигнатуры передаётся через указатель, чтобы не кастовать каждый элемент в отдельности, заранее определяем его как указатель на int
typedef void(*ModuleTransport__SendResult_Ptr)(ModuleTransport_I* transport, int *info, const void* data, size_t size, const Module__ErrorHandler* eh);
@@ -239,14 +239,14 @@ function CModule:register(profile, callbacks, sp_filename)
end
end
if ffi.arch == 'x86' then
if ffi.arch == "x86" then
self.functions["SendResult"] = function(transport, _, _, _, data, size, _)
if callbacks and transport == self.transport and callbacks["result"] and data then
callbacks["result"](ffi.string(data, size))
end
end
end
if ffi.arch == 'x64' then
if ffi.arch == "x64" then
self.functions["SendResult"] = function(transport, _, data, size, _)
if callbacks and transport == self.transport and callbacks["result"] and data then
callbacks["result"](ffi.string(data, size))
+2 -2
View File
@@ -80,7 +80,7 @@ local worker_safe = function(ctx, q_in, q_out, e_stop, e_quit)
print("new incoming message to worker", msg.type)
end
until not status
end
end,
}
mdl:register(ctx.profile, callbacks, ctx.svp_filename)
@@ -120,7 +120,7 @@ function CWinEventLog:init(q_in, q_out, e_stop, e_quit, profile)
svp_filename = lfs.currentdir() .. "\\store\\wel_sp",
__files = __files,
__debug = __args["debug_engine"][1] == "true",
__module_id = tostring(__config.ctx.name)
__module_id = tostring(__config.ctx.name),
}, q_in, q_out, e_stop, e_quit)
end