Merge branch 'master' of github.com:4mig4/lua-amqp

This commit is contained in:
cb57446
2018-08-31 14:39:19 +09:00
5 changed files with 1749 additions and 1762 deletions
+243 -255
View File
@@ -20,7 +20,7 @@ local format = string.format
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (_,_) return {} end
new_tab = function (_,_) return {} end
end
local buffer = {}
@@ -28,371 +28,359 @@ local buffer = {}
local mt = { __index = buffer }
function buffer.new(b,pos)
return setmetatable( {
buffer_ = b or "",
pos_ = pos or 1
},
mt)
return setmetatable( { buffer_ = b or "", pos_ = pos or 1 }, mt)
end
function buffer:hex_dump()
local len = #self.buffer_
local bytes = new_tab(len, 0)
for i = 1, len do
bytes[i] = tohex(byte(self.buffer_, i), 2)
end
return concat(bytes, " ")
local len = #self.buffer_
local bytes = new_tab(len, 0)
for i = 1, len do
bytes[i] = tohex(byte(self.buffer_, i), 2)
end
return concat(bytes, " ")
end
function buffer:get_i8()
local b = byte(self.buffer_, self.pos_)
self.pos_ = self.pos_ + 1
return b
local b = byte(self.buffer_, self.pos_)
self.pos_ = self.pos_ + 1
return b
end
function buffer:get_bool()
local v = self:get_i8()
return v and v ~= 0
local v = self:get_i8()
return v and v ~= 0
end
function buffer:get_i16()
local a0, a1 = byte(self.buffer_, self.pos_, self.pos_ + 1)
local r = bor(a1, lshift(a0, 8))
self.pos_ = self.pos_ + 2
return r
local a0, a1 = byte(self.buffer_, self.pos_, self.pos_ + 1)
local r = bor(a1, lshift(a0, 8))
self.pos_ = self.pos_ + 2
return r
end
function buffer:get_i24()
local a0, a1, a2 = byte(self.buffer_, self.pos_, self.pos_ + 2)
self.pos_ = self.pos_ + 3
return bor(a2,
lshift(a1, 8),
lshift(a0, 16))
local a0, a1, a2 = byte(self.buffer_, self.pos_, self.pos_ + 2)
self.pos_ = self.pos_ + 3
return bor(a2,
lshift(a1, 8),
lshift(a0, 16))
end
function buffer:get_i32()
local a0, a1, a2, a3 = byte(self.buffer_, self.pos_, self.pos_ + 3)
self.pos_ = self.pos_ + 4
return bor(a3,
lshift(a2, 8),
lshift(a1, 16),
lshift(a0, 24))
local a0, a1, a2, a3 = byte(self.buffer_, self.pos_, self.pos_ + 3)
self.pos_ = self.pos_ + 4
return bor(a3,
lshift(a2, 8),
lshift(a1, 16),
lshift(a0, 24))
end
function buffer:get_i64()
local a, b, c, d, e, f, g, h = byte(self.buffer_, self.pos_, self.pos_ + 7)
self.pos_ = self.pos_ + 8
local a, b, c, d, e, f, g, h = byte(self.buffer_, self.pos_, self.pos_ + 7)
self.pos_ = self.pos_ + 8
local lo = bor(h, lshift(g, 8), lshift(f, 16), lshift(e, 24))
local hi = bor(d, lshift(c, 8), lshift(b, 16), lshift(a, 24))
return lo + hi * 4294967296
local lo = bor(h, lshift(g, 8), lshift(f, 16), lshift(e, 24))
local hi = bor(d, lshift(c, 8), lshift(b, 16), lshift(a, 24))
return lo + hi * 4294967296
end
function buffer:get_short_string()
local length = self:get_i8()
local tail = self.pos_+length-1
local s = sub(self.buffer_,self.pos_,tail)
self.pos_ = tail + 1
return s
local length = self:get_i8()
local tail = self.pos_+length-1
local s = sub(self.buffer_,self.pos_,tail)
self.pos_ = tail + 1
return s
end
function buffer:get_long_string()
local length = self:get_i32()
local tail = self.pos_+length-1
local s = sub(self.buffer_,self.pos_,tail)
self.pos_ = tail + 1
return s
local length = self:get_i32()
local tail = self.pos_+length-1
local s = sub(self.buffer_,self.pos_,tail)
self.pos_ = tail + 1
return s
end
function buffer:get_decimal()
local scale = self:get_i8()
local value = self:get_i32()
local d = {scale = scale, value = value}
return d
local scale = self:get_i8()
local value = self:get_i32()
local d = {scale = scale, value = value}
return d
end
function buffer:get_f32()
return self:get_i32()
return self:get_i32()
end
function buffer:get_f64()
return self:get_i64()
return self:get_i64()
end
function buffer:get_timestamp()
return self:get_i64()
return self:get_i64()
end
function buffer:get_field_array()
local size = self:get_i32()
logger.dbg("[array] size: " .. size)
local a = {}
local p = self.pos_
while size > 0 do
local f = self:field_value()
a[#a+1] = f
size = size - (self.pos_ - p)
p = self.pos_
end
return a
local size = self:get_i32()
logger.dbg("[array] size: " .. size)
local a = {}
local p = self.pos_
while size > 0 do
local f = self:field_value()
a[#a+1] = f
size = size - (self.pos_ - p)
p = self.pos_
end
return a
end
function buffer:get_field_table()
local size = self:get_i32()
local r = {}
local p = self.pos_
local k,v
while size > 0 do
k = self:get_short_string()
v = self:field_value()
size = size - self.pos_ + p
p = self.pos_
r[k] = v
end
local size = self:get_i32()
local r = {}
local p = self.pos_
local k,v
while size > 0 do
k = self:get_short_string()
v = self:field_value()
size = size - self.pos_ + p
p = self.pos_
r[k] = v
end
return r
return r
end
function buffer:put_i8(i)
self.buffer_ = self.buffer_ .. char(band(i,0x0ff))
self.buffer_ = self.buffer_ .. char(band(i,0x0ff))
end
function buffer:put_bool(b)
local v = 0
if b and b ~= 0 then
v = 1
end
self:put_i8(v)
local v = 0
if b and b ~= 0 then
v = 1
end
self:put_i8(v)
end
function buffer:put_i16(i)
self.buffer_ = self.buffer_ ..
char(rshift(band(i,0xff00),8)) ..
char(band(i,0x0ff))
self.buffer_ = self.buffer_ ..
char(rshift(band(i,0xff00),8)) ..
char(band(i,0x0ff))
end
function buffer:put_i32(i)
self.buffer_ = self.buffer_ ..
char(rshift(band(i,0xff000000),24)) ..
char(rshift(band(i, 0x00ff0000),16)) ..
char(rshift(band(i, 0x0000ff00),8)) ..
char(band(i, 0x000000ff))
self.buffer_ = self.buffer_ ..
char(rshift(band(i,0xff000000),24)) ..
char(rshift(band(i, 0x00ff0000),16)) ..
char(rshift(band(i, 0x0000ff00),8)) ..
char(band(i, 0x000000ff))
end
function buffer:put_i64(i)
-- rshift has not support for 64bit?
-- side effect is that it will rotate for shifts bigger than 32bit
local hi = band(i/4294967296,0x0ffffffff)
local lo = band(i, 0x0ffffffff)
self:put_i32(hi)
self:put_i32(lo)
-- rshift has not support for 64bit?
-- side effect is that it will rotate for shifts bigger than 32bit
local hi = band(i/4294967296,0x0ffffffff)
local lo = band(i, 0x0ffffffff)
self:put_i32(hi)
self:put_i32(lo)
end
function buffer:put_f32(i)
self:put_i32(i)
self:put_i32(i)
end
function buffer:put_f64(i)
self:put_i64(i)
self:put_i64(i)
end
function buffer:put_timestamp(i)
self:put_i64(i)
self:put_i64(i)
end
function buffer:put_decimal(d)
self:put_i8(d.scale)
self:put_i32(d.value)
self:put_i8(d.scale)
self:put_i32(d.value)
end
function buffer:put_short_string(s)
local len = #s
self:put_i8(len)
self.buffer_ = self.buffer_ .. s
local len = #s
self:put_i8(len)
self.buffer_ = self.buffer_ .. s
end
function buffer:put_long_string(s)
local len = #s
self:put_i32(len)
self.buffer_ = self.buffer_ .. s
local len = #s
self:put_i32(len)
self.buffer_ = self.buffer_ .. s
end
function buffer:put_payload(payload)
self.buffer_ = self.buffer_ .. payload
self.buffer_ = self.buffer_ .. payload
end
local function is_array(t)
local i = 0
for _ in pairs(t) do
i = i + 1
if t[i] == nil then return false end
end
return true
local i = 0
for _ in pairs(t) do
i = i + 1
if t[i] == nil then return false end
end
return true
end
function buffer:put_field_array(a)
local b = buffer.new()
for i = 1, #a do
b:put_field_value(a[i])
end
local b = buffer.new()
for i = 1, #a do
b:put_field_value(a[i])
end
self:put_i32(#b.buffer_)
self:put_payload(b.buffer_)
self:put_i32(#b.buffer_)
self:put_payload(b.buffer_)
end
function buffer:put_field_table(tab)
local b = buffer.new()
for k,v in pairs(tab) do
b:put_short_string(k)
b:put_field_value(v)
end
self:put_i32(#b.buffer_)
self:put_payload(b.buffer_)
local b = buffer.new()
for k,v in pairs(tab) do
b:put_short_string(k)
b:put_field_value(v)
end
self:put_i32(#b.buffer_)
self:put_payload(b.buffer_)
end
local fields_ = {
t = {
r = function (self)
local b = self:get_i8()
return b ~= 0
end,
w = function (self,val)
local b = 0
if val ~= 0 then
b = 1
end
self:put_i8(b)
t = {
r = function (self)
local b = self:get_i8()
return b ~= 0
end,
w = function (self,val)
local b = 0
if val ~= 0 then
b = 1
end
},
b = {
r = buffer.get_i8,
w = buffer.put_i8
},
B = {
r = buffer.get_i8,
w = buffer.put_i8
},
U = {
r = buffer.get_i16,
w = buffer.put_i16
},
u = {
r = buffer.get_i16,
w = buffer.put_i16
},
I = {
r = buffer.get_i32,
w = buffer.put_i32
},
i = {
r = buffer.get_i32,
w = buffer.put_i32
},
L = {
r = buffer.get_i64,
w = buffer.put_i64
},
l = {
r = buffer.get_i64,
w = buffer.put_i64
},
f = {
r = buffer.get_f32,
w = buffer.put_f32
},
d = {
r = buffer.get_f64,
w = buffer.put_f64
},
D = {
r = buffer.get_decimal,
w = buffer.put_decimal
},
s = {
r = buffer.get_short_string,
w = buffer.put_short_string
},
S = {
r = buffer.get_long_string,
w = buffer.put_long_string
},
A = {
r = buffer.get_field_array,
w = buffer.put_field_array
},
T = {
r = buffer.get_timestamp,
w = buffer.put_timestamp
},
F = {
r = buffer.get_field_table,
w = buffer.put_field_table
},
x = {
r = buffer.get_long_string,
w = buffer.put_long_string
},
V = {
r = function()
return nil
end,
w = nil
}
self:put_i8(b)
end
},
b = {
r = buffer.get_i8,
w = buffer.put_i8
},
B = {
r = buffer.get_i8,
w = buffer.put_i8
},
U = {
r = buffer.get_i16,
w = buffer.put_i16
},
u = {
r = buffer.get_i16,
w = buffer.put_i16
},
I = {
r = buffer.get_i32,
w = buffer.put_i32
},
i = {
r = buffer.get_i32,
w = buffer.put_i32
},
L = {
r = buffer.get_i64,
w = buffer.put_i64
},
l = {
r = buffer.get_i64,
w = buffer.put_i64
},
f = {
r = buffer.get_f32,
w = buffer.put_f32
},
d = {
r = buffer.get_f64,
w = buffer.put_f64
},
D = {
r = buffer.get_decimal,
w = buffer.put_decimal
},
s = {
r = buffer.get_short_string,
w = buffer.put_short_string
},
S = {
r = buffer.get_long_string,
w = buffer.put_long_string
},
A = {
r = buffer.get_field_array,
w = buffer.put_field_array
},
T = {
r = buffer.get_timestamp,
w = buffer.put_timestamp
},
F = {
r = buffer.get_field_table,
w = buffer.put_field_table
},
x = {
r = buffer.get_long_string,
w = buffer.put_long_string
},
V = {
r = function()
return nil
end,
w = nil
}
}
function buffer:field_value()
local typ = self:get_i8()
local codec = fields_[char(typ)]
if not codec then
local err = format("codec[%d] not found.",typ)
logger.error("[field_value] " .. err)
return nil,err
end
return codec.r(self)
local typ = self:get_i8()
local codec = fields_[char(typ)]
if not codec then
local err = format("codec[%d] not found.",typ)
logger.error("[field_value] " .. err)
return nil,err
end
return codec.r(self)
end
function buffer:put_field_value(value)
-- FIXME: to detect the type of the value
local t = type(value)
local typ = nil
if t == 'number' then
typ = 'I' -- assume to be i32
elseif t == 'boolean' then
typ = 't'
elseif t == 'string' then
typ = 'S'
elseif t == 'table' then
typ = 'F'
if is_array(value) then
typ = 'A'
end
end
-- wire the type
self:put_i8(byte(typ))
local codec = fields_[typ]
if not codec then
local err = format("codec[%d] not found.",typ)
logger.error("[field_value] " .. err)
return nil,err
end
codec.w(self,value)
-- FIXME: to detect the type of the value
local t = type(value)
local typ = nil
if t == 'number' then
typ = 'I' -- assume to be i32
elseif t == 'boolean' then
typ = 't'
elseif t == 'string' then
typ = 'S'
elseif t == 'table' then
typ = 'F'
if is_array(value) then
typ = 'A'
end
end
-- wire the type
self:put_i8(byte(typ))
local codec = fields_[typ]
if not codec then
local err = format("codec[%d] not found.",typ)
logger.error("[field_value] " .. err)
return nil,err
end
codec.w(self,value)
end
function buffer:payload()
return self.buffer_
return self.buffer_
end
return buffer
+151 -151
View File
@@ -8,166 +8,166 @@
local amqp = {
DEFAULT_CHANNEL = 0,
DEFAULT_FRAME_SIZE = 131072,
DEFAULT_MAX_CHANNELS = 65535,
DEFAULT_HEARTBEAT = 30,
DEFAULT_CHANNEL = 0,
DEFAULT_FRAME_SIZE = 131072,
DEFAULT_MAX_CHANNELS = 65535,
DEFAULT_HEARTBEAT = 30,
PROTOCOL_VERSION_MAJOR = 0,
PROTOCOL_VERSION_MINOR = 9,
PROTOCOL_VERSION_REVISION = 1,
PROTOCOL_PORT = 5672,
PROTOCOL_SSL_PORT = 5671,
PROTOCOL_VERSION_MAJOR = 0,
PROTOCOL_VERSION_MINOR = 9,
PROTOCOL_VERSION_REVISION = 1,
PROTOCOL_PORT = 5672,
PROTOCOL_SSL_PORT = 5671,
state = {
CLOSED = 0,
ESTABLISHED = 1,
CLOSE_WAIT = 2,
},
state = {
CLOSED = 0,
ESTABLISHED = 1,
CLOSE_WAIT = 2,
},
frame = {
METHOD_FRAME = 1,
HEADER_FRAME = 2,
BODY_FRAME = 3,
HEARTBEAT_FRAME = 8,
frame = {
METHOD_FRAME = 1,
HEADER_FRAME = 2,
BODY_FRAME = 3,
HEARTBEAT_FRAME = 8,
FRAME_MIN_SIZE = 4096,
FRAME_END = 0xCE
},
FRAME_MIN_SIZE = 4096,
FRAME_END = 0xCE
},
method = {
connection = {
START = 0x0A,
START_OK = 0x0B,
SECURE = 0x14,
SECURE_OK = 0x15,
TUNE = 0x1E,
TUNE_OK = 0x1F,
OPEN = 0x28,
OPEN_OK = 0x29,
CLOSE = 0x32,
CLOSE_OK = 0x33,
BLOCKED = 0x3C,
UNBLOCKED = 0x3D,
},
channel = {
OPEN = 0x0A,
OPEN_OK = 0x0B,
FLOW = 0x14,
FLOW_OK = 0x15,
CLOSE = 0x28,
CLOSE_OK = 0x29,
},
exchange = {
DECLARE = 0x0A,
DECLARE_OK = 0x0B,
DELETE = 0x14,
DELETE_OK = 0x15,
BIND = 0x1E,
BIND_OK = 0x1F,
UNBIND = 0x28,
UNBIND_OK = 0x33,
},
queue = {
DECLARE = 0x0A,
DECLARE_OK = 0x0B,
BIND = 0x14,
BIND_OK = 0x15,
PURGE = 0x1E,
PURGE_OK = 0x1F,
DELETE = 0x28,
DELETE_OK = 0x29,
UNBIND = 0x32,
UNBIND_OK = 0x33,
},
basic = {
QOS = 0x0A,
QOS_OK = 0x0B,
CONSUME = 0x14,
CONSUME_OK = 0x15,
CANCEL = 0x1E,
CANCEL_OK = 0x1F,
PUBLISH = 0x28,
RETURN = 0x32,
DELIVER = 0x3C,
GET = 0x46,
GET_OK = 0x47,
GET_EMPTY = 0x48,
ACK = 0x50,
REJECT = 0x5A,
RECOVER_ASYNC = 0x64,
RECOVER = 0x6E,
RECOVER_OK = 0x6F,
NACK = 0x78,
},
tx = {
SELECT = 0x0A,
SELECT_OK = 0x0B,
COMMIT = 0x14,
COMMIT_OK = 0x15,
ROLLBACK = 0x1E,
ROLLBACK_OK = 0x1F,
},
confirm = {
SELECT = 0x0A,
SELECT_OK = 0x0B
}
},
method = {
connection = {
START = 0x0A,
START_OK = 0x0B,
SECURE = 0x14,
SECURE_OK = 0x15,
TUNE = 0x1E,
TUNE_OK = 0x1F,
OPEN = 0x28,
OPEN_OK = 0x29,
CLOSE = 0x32,
CLOSE_OK = 0x33,
BLOCKED = 0x3C,
UNBLOCKED = 0x3D,
},
channel = {
OPEN = 0x0A,
OPEN_OK = 0x0B,
FLOW = 0x14,
FLOW_OK = 0x15,
CLOSE = 0x28,
CLOSE_OK = 0x29,
},
exchange = {
DECLARE = 0x0A,
DECLARE_OK = 0x0B,
DELETE = 0x14,
DELETE_OK = 0x15,
BIND = 0x1E,
BIND_OK = 0x1F,
UNBIND = 0x28,
UNBIND_OK = 0x33,
},
queue = {
DECLARE = 0x0A,
DECLARE_OK = 0x0B,
BIND = 0x14,
BIND_OK = 0x15,
PURGE = 0x1E,
PURGE_OK = 0x1F,
DELETE = 0x28,
DELETE_OK = 0x29,
UNBIND = 0x32,
UNBIND_OK = 0x33,
},
basic = {
QOS = 0x0A,
QOS_OK = 0x0B,
CONSUME = 0x14,
CONSUME_OK = 0x15,
CANCEL = 0x1E,
CANCEL_OK = 0x1F,
PUBLISH = 0x28,
RETURN = 0x32,
DELIVER = 0x3C,
GET = 0x46,
GET_OK = 0x47,
GET_EMPTY = 0x48,
ACK = 0x50,
REJECT = 0x5A,
RECOVER_ASYNC = 0x64,
RECOVER = 0x6E,
RECOVER_OK = 0x6F,
NACK = 0x78,
},
tx = {
SELECT = 0x0A,
SELECT_OK = 0x0B,
COMMIT = 0x14,
COMMIT_OK = 0x15,
ROLLBACK = 0x1E,
ROLLBACK_OK = 0x1F,
},
confirm = {
SELECT = 0x0A,
SELECT_OK = 0x0B
}
},
class = {
CONNECTION = 0x000A,
CHANNEL = 0x0014,
EXCHANGE = 0x0028,
QUEUE = 0x0032,
BASIC = 0x003C,
TX = 0x005A,
CONFIRM = 0x0055
},
class = {
CONNECTION = 0x000A,
CHANNEL = 0x0014,
EXCHANGE = 0x0028,
QUEUE = 0x0032,
BASIC = 0x003C,
TX = 0x005A,
CONFIRM = 0x0055
},
flag = {
CONTENT_TYPE = 0x8000,
CONTENT_ENCODING = 0x4000,
HEADERS = 0x2000,
DELIVERY_MODE = 0x1000,
PRIORITY = 0x0800,
CORRELATION_ID = 0x0400,
REPLY_TO = 0x0200,
EXPIRATION = 0x0100,
MESSAGE_ID = 0x0080,
TIMESTAMP = 0x0040,
TYPE = 0x0020,
USER_ID = 0x0010,
APP_ID = 0x0008,
RESERVED1 = 0x0004
},
flag = {
CONTENT_TYPE = 0x8000,
CONTENT_ENCODING = 0x4000,
HEADERS = 0x2000,
DELIVERY_MODE = 0x1000,
PRIORITY = 0x0800,
CORRELATION_ID = 0x0400,
REPLY_TO = 0x0200,
EXPIRATION = 0x0100,
MESSAGE_ID = 0x0080,
TIMESTAMP = 0x0040,
TYPE = 0x0020,
USER_ID = 0x0010,
APP_ID = 0x0008,
RESERVED1 = 0x0004
},
err = {
REPLY_SUCCESS = 200,
CONTENT_TOO_LARGE = 311,
NO_ROUTE = 312,
NO_CONSUMERS = 313,
CONNECTION_FORCED = 320,
INVALID_PATH = 402,
ACCESS_REFUSED = 403,
NOT_FOUND = 404,
RESOURCE_LOCKED = 405,
PRECONDITION_FAILED = 406,
FRAME_ERROR = 501,
SYNTAX_ERROR = 502,
COMMAND_INVALID = 503,
CHANNEL_ERROR = 504,
UNEXPECTED_FRAME = 505,
RESOURCE_ERROR = 506,
NOT_ALLOWED = 530,
NOT_IMPLEMENTED = 540,
INTERNAL_ERROR = 541
},
err = {
REPLY_SUCCESS = 200,
CONTENT_TOO_LARGE = 311,
NO_ROUTE = 312,
NO_CONSUMERS = 313,
CONNECTION_FORCED = 320,
INVALID_PATH = 402,
ACCESS_REFUSED = 403,
NOT_FOUND = 404,
RESOURCE_LOCKED = 405,
PRECONDITION_FAILED = 406,
FRAME_ERROR = 501,
SYNTAX_ERROR = 502,
COMMAND_INVALID = 503,
CHANNEL_ERROR = 504,
UNEXPECTED_FRAME = 505,
RESOURCE_ERROR = 506,
NOT_ALLOWED = 530,
NOT_IMPLEMENTED = 540,
INTERNAL_ERROR = 541
},
PRODUCT = "lua-amqp",
VERSION = "1.0.2",
COPYRIGHT = "Copyright (c) 2016,2017,2018 Meng Zhang @Yottaa,Inc , 4mig4",
LOCALE = "en_US",
MECHANISM_PLAIN = "PLAIN"
PRODUCT = "lua-amqp",
VERSION = "1.0.2",
COPYRIGHT = "Copyright (c) 2016,2017,2018 Meng Zhang @Yottaa,Inc , 4mig4",
LOCALE = "en_US",
MECHANISM_PLAIN = "PLAIN"
}
return amqp
+1249 -1249
View File
File diff suppressed because it is too large Load Diff
+64 -65
View File
@@ -34,11 +34,11 @@ local amqp = {}
-- return the key's value from the first table that has it, or VALUE if none do
local function _getopt(k,t,...)
if select('#',...)==0 then
return t
return t
elseif t[k]~=nil then
return t[k]
return t[k]
else
return _getopt(k,...)
return _getopt(k,...)
end
end
@@ -69,19 +69,18 @@ function amqp:new(opts)
return nil, err
end
local ctx = { sock = sock,
opts = opts,
connection_state = c.state.CLOSED,
channel_state = c.state.CLOSED,
major = c.PROTOCOL_VERSION_MAJOR,
minor = c.PROTOCOL_VERSION_MINOR,
revision = c.PROTOCOL_VERSION_REVISION,
frame_max = c.DEFAULT_FRAME_SIZE,
channel_max = c.DEFAULT_MAX_CHANNELS,
mechanism = c.MECHANISM_PLAIN
}
local ctx = {
sock = sock,
opts = opts,
connection_state = c.state.CLOSED,
channel_state = c.state.CLOSED,
major = c.PROTOCOL_VERSION_MAJOR,
minor = c.PROTOCOL_VERSION_MINOR,
revision = c.PROTOCOL_VERSION_REVISION,
frame_max = c.DEFAULT_FRAME_SIZE,
channel_max = c.DEFAULT_MAX_CHANNELS,
mechanism = c.MECHANISM_PLAIN
}
setmetatable(ctx,mt)
return ctx
end
@@ -168,8 +167,8 @@ function amqp:connection_start_ok()
local user = self.opts.user or "guest"
local password = self.opts.password or "guest"
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.START_OK)
c.class.CONNECTION,
c.method.connection.START_OK)
f.method = {
properties = {
product = c.PRODUCT,
@@ -190,8 +189,8 @@ end
function amqp:connection_tune_ok()
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.TUNE_OK)
c.class.CONNECTION,
c.method.connection.TUNE_OK)
f.method = {
channel_max = self.channel_max or c.DEFAULT_MAX_CHANNELS,
@@ -211,8 +210,8 @@ end
function amqp:connection_open()
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.OPEN)
c.class.CONNECTION,
c.method.connection.OPEN)
f.method = {
virtual_host = self.opts.virtual_host or "/"
}
@@ -233,8 +232,8 @@ end
function amqp:connection_close(reason)
local f = frame.new_method_frame(c.DEFAULT_CHANNEL,
c.class.CONNECTION,
c.method.connection.CLOSE)
c.class.CONNECTION,
c.method.connection.CLOSE)
f.method = sanitize_close_reason(self, reason)
return frame.wire_method_frame(self, f)
end
@@ -242,8 +241,8 @@ end
function amqp:connection_close_ok()
local f = frame.new_method_frame(self.channel or 1,
c.class.CONNECTION,
c.method.connection.CLOSE_OK)
c.class.CONNECTION,
c.method.connection.CLOSE_OK)
return frame.wire_method_frame(self, f)
end
@@ -253,8 +252,8 @@ end
function amqp:channel_open()
local f = frame.new_method_frame(self.opts.channel or 1,
c.class.CHANNEL,
c.method.channel.OPEN)
c.class.CHANNEL,
c.method.channel.OPEN)
local msg = f:encode()
local sock = self.sock
local bytes,err = sock:send(msg)
@@ -452,17 +451,17 @@ function amqp:prepare_to_consume()
end
if self.opts.exchange ~= '' then
res, err = amqp.queue_bind(self)
if not res then
logger.error("[prepare_to_consume] queue_bind failed: ", err)
return nil, err
end
res, err = amqp.queue_bind(self)
if not res then
logger.error("[prepare_to_consume] queue_bind failed: ", err)
return nil, err
end
end
res, err = amqp.basic_consume(self)
if not res then
logger.error("[prepare_to_consume] basic_consume failed: ", err)
return nil, err
logger.error("[prepare_to_consume] basic_consume failed: ", err)
return nil, err
end
return true
@@ -486,7 +485,7 @@ local function timedout(ctx, timeouts)
end
function amqp:timedout(timeouts)
return timedout(self, timeouts)
return timedout(self, timeouts)
end
local function exiting()
@@ -499,8 +498,8 @@ end
function amqp:basic_ack(ok, delivery_tag)
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
ok and c.method.basic.ACK or c.method.basic.NACK)
c.class.BASIC,
ok and c.method.basic.ACK or c.method.basic.NACK)
f.method = {
delivery_tag = delivery_tag,
@@ -523,9 +522,9 @@ function amqp:consume_loop(callback)
local status
while true do
--
--
::continue::
--
--
f, err0 = frame.consume_frame(self)
if not f then -- if start
if exiting() then
@@ -600,7 +599,7 @@ function amqp:consume_loop(callback)
elseif f.type == c.frame.HEADER_FRAME then
f_header = f
logger.dbg(format("[header] class_id: %d weight: %d, body_size: %d",
f.class_id, f.weight, f.body_size))
f.class_id, f.weight, f.body_size))
logger.dbg("[frame.properties]",f.properties)
elseif f.type == c.frame.BODY_FRAME then
status = true
@@ -645,14 +644,14 @@ function amqp:consume()
ok, err = self:setup()
if not ok then
self:teardown()
return nil, err
self:teardown()
return nil, err
end
ok, err = self:prepare_to_consume()
if not ok then
self:teardown()
return nil, err
self:teardown()
return nil, err
end
return self:consume_loop(self.opts.callback)
@@ -744,8 +743,8 @@ function amqp:queue_declare(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.DECLARE)
c.class.QUEUE,
c.method.queue.DECLARE)
f.method = {
queue = opts.queue or self.opts.queue,
passive = _getopt('passive', opts, self.opts, false),
@@ -765,8 +764,8 @@ function amqp:queue_bind(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.BIND)
c.class.QUEUE,
c.method.queue.BIND)
f.method = {
queue = opts.queue or self.opts.queue,
@@ -786,8 +785,8 @@ function amqp:queue_unbind(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.UNBIND)
c.class.QUEUE,
c.method.queue.UNBIND)
f.method = {
queue = opts.queue or self.opts.queue,
@@ -807,8 +806,8 @@ function amqp:queue_delete(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.QUEUE,
c.method.queue.DELETE)
c.class.QUEUE,
c.method.queue.DELETE)
f.method = {
queue = opts.queue or self.opts.queue,
@@ -827,8 +826,8 @@ function amqp:exchange_declare(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.DECLARE)
c.class.EXCHANGE,
c.method.exchange.DECLARE)
f.method = {
exchange = opts.exchange or self.opts.exchange,
@@ -857,8 +856,8 @@ function amqp:exchange_bind(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.BIND)
c.class.EXCHANGE,
c.method.exchange.BIND)
f.method = {
destination = opts.destination,
@@ -884,8 +883,8 @@ function amqp:exchange_unbind(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.UNBIND)
c.class.EXCHANGE,
c.method.exchange.UNBIND)
f.method = {
destination = opts.destination,
@@ -901,8 +900,8 @@ function amqp:exchange_delete(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.EXCHANGE,
c.method.exchange.DELETE)
c.class.EXCHANGE,
c.method.exchange.DELETE)
f.method = {
exchange = opts.exchange or self.opts.exchange,
@@ -924,8 +923,8 @@ function amqp:basic_consume(opts)
end
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
c.method.basic.CONSUME)
c.class.BASIC,
c.method.basic.CONSUME)
f.method = {
queue = opts.queue or self.opts.queue,
@@ -942,8 +941,8 @@ function amqp:basic_publish(opts)
opts = opts or {}
local f = frame.new_method_frame(self.channel or 1,
c.class.BASIC,
c.method.basic.PUBLISH)
c.class.BASIC,
c.method.basic.PUBLISH)
f.method = {
exchange = opts.exchange or self.opts.exchange,
routing_key = _getopt('routing_key',opts, self.opts, ""),
+42 -42
View File
@@ -11,9 +11,9 @@ local logger = {}
-- logging scaffold
local log
if _G.ngx and _G.ngx.log then
log = _G.ngx.log
log = _G.ngx.log
else
log = print
log = print
end
-- logging level for print
@@ -23,72 +23,72 @@ local DEBUG = 8
-- ngx.log requires a number to indicate the logging level
if _G.ngx then
ERR = _G.ngx.ERR
INFO = _G.ngx.INFO
DEBUG = _G.ngx.DEBUG
ERR = _G.ngx.ERR
INFO = _G.ngx.INFO
DEBUG = _G.ngx.DEBUG
end
local level_ = INFO
local function to_string(v_)
if v_ == nil then
return ""
end
if v_ == nil then
return ""
end
if type(v_) ~= "table" then
return tostring(v_)
end
if type(v_) ~= "table" then
return tostring(v_)
end
local s = "["
for k,v in pairs(v_) do
if k ~= nil then
s = s .. to_string(k) .. ":"
end
if v ~= nil then
s = s .. to_string(v)
end
s = s .. " "
end
s = s .. "]"
return s
local s = "["
for k,v in pairs(v_) do
if k ~= nil then
s = s .. to_string(k) .. ":"
end
if v ~= nil then
s = s .. to_string(v)
end
s = s .. " "
end
s = s .. "]"
return s
end
local function va_table_to_string(tbl)
local res = ""
for _,v in pairs(tbl) do
res = res .. to_string(v) .. "\t"
end
return res
local res = ""
for _,v in pairs(tbl) do
res = res .. to_string(v) .. "\t"
end
return res
end
function logger.set_level(level)
level_ = level
level_ = level
end
function logger.error(...)
if level_ < ERR then
return
end
log(ERR, va_table_to_string({...}))
if level_ < ERR then
return
end
log(ERR, va_table_to_string({...}))
end
function logger.info(...)
if level_ < INFO then
return
end
log(INFO, va_table_to_string({...}))
if level_ < INFO then
return
end
log(INFO, va_table_to_string({...}))
end
function logger.dbg(...)
if level_ ~= DEBUG then
return
end
if level_ ~= DEBUG then
return
end
log(DEBUG, va_table_to_string({...}))
log(DEBUG, va_table_to_string({...}))
end
function logger.is_debug_enabled()
return level_ == DEBUG
return level_ == DEBUG
end
return logger