Skip to content

pipe一些修改 #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: siyuan.make_ngx_resp
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions lib/pipe/filter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,59 @@ function _M.make_read_timeout_filter(r_idx)
end
end

function _M.make_read_max_size_filter(max_size, r_idx)
local size = 0

return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst)
size = size + #(rbufs[r_idx] or '')
if size > max_size then
return nil, 'EntityTooLarge',
string.format('read size %s large than %s', size, max_size)
end
end
end

function _M.make_kill_low_write_speed_filter(pobj, assert_func, quorum)
local all_stat = pobj:get_stat()

return function(rbufs, n_rd, wbufs, n_wrt, pipe_rst)
local ok_stat, n_ok = {}, 0
for idx, wrt_rst in pairs(pipe_rst.write_result) do
if wrt_rst.err == nil then
local ident = pobj.wrt_cos[idx].ident
local id_stat = all_stat[ident] or {}

if id_stat.write_time ~= nil and id_stat.write_size ~= nil then
ok_stat[ident] = {
write_size = id_stat.write_size,
write_time = id_stat.write_time,
}
n_ok = n_ok + 1
end
end
end

if n_ok <= quorum then
return nil, nil, nil
end

for ident, st in pairs(ok_stat) do
local cur_speed = st.write_size/(math.max(st.write_time * 1000, 1)/1000)

if assert_func(ok_stat, ident, st, cur_speed) then
local err = {
err_code = "WriteSlow",
err_msg = to_str(ident, " coroutine write slow, speed:",
strutil.placeholder(cur_speed/1024, '-', '%.3f'), "kb/s"),
}

pobj.wrt_cos[ident].err = err
ngx.log(ngx.ERR, to_str("slow coroutine:", pobj.wrt_cos[ident], ", error:", err))

break
end
end
end
end

return _M
181 changes: 181 additions & 0 deletions lib/pipe/httplib.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
local httpclient = require("acid.httpclient")
local tableutil = require("acid.tableutil")
local strutil = require("acid.strutil")

local resty_sha1 = require( "resty.sha1" )
local resty_md5 = require( "resty.md5" )
local resty_string = require( "resty.string" )

local _M = { _VERSION = '1.0' }

local to_str = strutil.to_str

local BLOCK_SIZE = 1024 * 1024
local SOCKET_TIMEOUTS = {5 * 1000, 100 * 1000, 100 * 1000}

function _M.connect_http(ips, port, verb, uri, opts)
opts = opts or {}

local try_times = math.max(opts.try_times or 1, 1)

local http, _, err_code, err_msg

for _, ip in ipairs(ips) do
local headers = tableutil.dup(opts.headers or {}, true)
headers.Host = headers.Host or ip

local req = {
ip = ip,
port = port,
uri = uri,
verb = verb,
headers = headers,
}

if opts.signature_cb ~= nil then
req = opts.signature_cb(req)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面这些初始化和赋值是不是可以从for循环中拿出去


http = httpclient:new(ip, port, opts.timeouts or SOCKET_TIMEOUTS, opts.http_opts)

local h_opts = {method=req.verb, headers=req.headers}
for i=1, try_times, 1 do
_, err_code, err_msg = http:send_request(req.uri, h_opts)
if err_code == nil then
return http
end
end
end

if err_code ~= nil then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可以直接返回错误

err_msg = to_str(err_code, ':', err_msg)
err_code = 'ConnectError'
end

return nil, err_code, err_msg
end

function _M.get_http_response(http, opts)
opts = opts or {}

local _, err_code, err_msg = http:finish_request()
if err_code ~= nil then
return nil, err_code, err_msg
end

if opts.success_status ~= nil and opts.success_status ~= http.status then
return nil, 'InvalidHttpStatus', to_str('response http status:', http.status)
end

local resp = {
status = http.status,
headers = http.ori_headers,
}

if opts.read_body == false then
return resp
end

local body = {}

while true do
local data, err_code, err_msg = http:read_body(BLOCK_SIZE)
if err_code ~= nil then
return resp, err_code, err_msg
end

if data == '' then
break
end

table.insert(body, data)
end
resp.body = table.concat(body)

return resp
end

function _M.loop_http_read(pobj, ident, http, block_size, opts)
opts = opts or {}

local rst = {
size = 0,
md5 = nil,
sha1 = nil,
}

local md5_alg
if opts.calc_md5 == true then
md5_alg = resty_md5:new()
end

local sha1_alg
if opts.calc_sha1 == true then
sha1_alg = resty_sha1:new()
end

while true do
local data, err_code, err_msg =
http:read_body(block_size or BLOCK_SIZE)
if err_code ~= nil then
return nil, err_code, err_msg
end

local _, err_code, err_msg = pobj:write_pipe(ident, data)
if err_code ~= nil then
return nil, err_code, err_msg
end

if opts.calc_md5 == true then
md5_alg:update(data)
end

if opts.calc_sha1 == true then
sha1_alg:update(data)
end

rst.size = rst.size + #data

if data == '' then
if opts.calc_md5 == true then
rst.md5 = resty_string.to_hex(md5_alg:final())
end

if opts.calc_sha1 == true then
rst.sha1 = resty_string.to_hex(sha1_alg:final())
end

break
end
end

return rst
end

function _M.loop_http_write(pobj, ident, http)
local bytes = 0

while true do
local data, err_code, err_msg = pobj:read_pipe(ident)
if err_code ~= nil then
return nil, err_code, err_msg
end

if data == '' then
break
end

local now = ngx.now()
local _, err_code, err_msg = http:send_body(data)

ngx.update_time()
pobj:incr_stat(ident, "write_size", #data)
pobj:incr_stat(ident, "write_time", ngx.now()-now)

bytes = bytes + #data
end

return bytes
end

return _M
54 changes: 49 additions & 5 deletions lib/pipe/pipe.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ _M.writer = pipe_writer
_M.filter = pipe_filter

local to_str = strutil.to_str
local READ_TIMEOUT = 300 --seconds
local WRITE_TIMEOUT = 300 --seconds
local READ_TIMEOUT = 600 * 1000 --ms
local WRITE_TIMEOUT = 600 * 1000 --ms

local function wrap_co_func(co, ...)
local ok, rst, err_code, err_msg = pcall(co.func, ...)
Expand All @@ -30,7 +30,7 @@ local function wrap_co_func(co, ...)
err_code, err_msg = 'CoroutineError', rst
end
co.err = {err_code = err_code, err_msg = err_msg}
ngx.log(ngx.ERR, to_str(co.rd_or_wrt, " coroutine exit with error:", co.err))
ngx.log(ngx.ERR, to_str(co.rd_or_wrt, ' ', co.ident, " coroutine exit with error:", co.err))
end

co.is_dead = true
Expand Down Expand Up @@ -175,19 +175,26 @@ local function async_wait_co_sema(self, cos, sema, quorum, timeout, err_code)

while ngx.now() <= dead_time do
local n_ok = 0
local n_active = 0

for _, co in ipairs(cos) do
if co.is_dead then
if co.err == nil then
n_ok = n_ok + 1
end
else
n_active = n_active + 1
end
end

if n_ok >= quorum then
return
end

if n_active + n_ok < quorum then
break
end

ngx.sleep(0.001)
end

Expand Down Expand Up @@ -246,13 +253,34 @@ function _M.new(_, rds, wrts, filters, rd_timeout, wrt_timeout)
wrt_filters = filters.wrt_filters
or {pipe_filter.make_write_quorum_filter(#wrts)},

rd_timeout = rd_timeout or READ_TIMEOUT,
wrt_timeout = wrt_timeout or WRITE_TIMEOUT,
rd_timeout = (rd_timeout or READ_TIMEOUT)/1000,
wrt_timeout = (wrt_timeout or WRITE_TIMEOUT)/1000,
stat = {},
}

return setmetatable(obj, mt)
end

function _M.set_stat(self, ident, key, val)
self.stat[ident] = self.stat[ident] or {}
self.stat[ident][key] = val

return val
end

function _M.incr_stat(self, ident, key, val)
self.stat[ident] = self.stat[ident] or {}

local prev = self.stat[ident][key] or 0
self.stat[ident][key] = prev + val

return self.stat[ident][key]
end

function _M.get_stat(self)
return self.stat
end

function _M.write_pipe(pobj, ident, buf)
local rd_co = pobj.rd_cos[ident]

Expand Down Expand Up @@ -360,4 +388,20 @@ function _M.pipe(self, is_running, quorum_return)
return get_pipe_result(self)
end

function _M.add_read_filter(self, flt)
if flt == nil then
return
end

table.insert(self.rd_filters, flt)
end

function _M.add_write_filter(self, flt)
if flt == nil then
return
end

table.insert(self.wrt_filters, flt)
end

return _M
Loading