openresty中redis连接池的实现
本文介绍一下openresty中redis连接池的实现
1. lua-resty-redis
我们可以到lua-resty-redis官网下载对应的lua模块
2. 实现参考
local redis_c = require "resty.redis"
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end
local _M = new_tab(0, 155)
_M._VERSION = '0.01'
local commands = {
"append", "auth", "bgrewriteaof",
"bgsave", "bitcount", "bitop",
"blpop", "brpop",
"brpoplpush", "client", "config",
"dbsize",
"debug", "decr", "decrby",
"del", "discard", "dump",
"echo",
"eval", "exec", "exists",
"expire", "expireat", "flushall",
"flushdb", "get", "getbit",
"getrange", "getset", "hdel",
"hexists", "hget", "hgetall",
"hincrby", "hincrbyfloat", "hkeys",
"hlen",
"hmget", "hmset", "hscan",
"hset",
"hsetnx", "hvals", "incr",
"incrby", "incrbyfloat", "info",
"keys",
"lastsave", "lindex", "linsert",
"llen", "lpop", "lpush",
"lpushx", "lrange", "lrem",
"lset", "ltrim", "mget",
"migrate",
"monitor", "move", "mset",
"msetnx", "multi", "object",
"persist", "pexpire", "pexpireat",
"ping", "psetex", "psubscribe",
"pttl",
"publish", --[[ "punsubscribe", ]] "pubsub",
"quit",
"randomkey", "rename", "renamenx",
"restore",
"rpop", "rpoplpush", "rpush",
"rpushx", "sadd", "save",
"scan", "scard", "script",
"sdiff", "sdiffstore",
"select", "set", "setbit",
"setex", "setnx", "setrange",
"shutdown", "sinter", "sinterstore",
"sismember", "slaveof", "slowlog",
"smembers", "smove", "sort",
"spop", "srandmember", "srem",
"sscan",
"strlen", --[[ "subscribe", ]] "sunion",
"sunionstore", "sync", "time",
"ttl",
"type", --[[ "unsubscribe", ]] "unwatch",
"watch", "zadd", "zcard",
"zcount", "zincrby", "zinterstore",
"zrange", "zrangebyscore", "zrank",
"zrem", "zremrangebyrank", "zremrangebyscore",
"zrevrange", "zrevrangebyscore", "zrevrank",
"zscan",
"zscore", "zunionstore", "evalsha"
}
local mt = { __index = _M }
local function is_redis_null(res)
if type(res) == "table" then
for k,v in pairs(res) do
if v ~= ngx.null then
return false
end
end
return true
elseif res == ngx.null then
return true
elseif res == nil then
return true
end
return false
end
--[[
this function doesn't really close the redis, but put it into the pool
--]]
function _M.close_redis(self, redis)
if not redis then
return
end
--[[
release the redis pool
--]]
local pool_max_idle_time = self.pool_max_idle_time
local pool_size = self.pool_size
local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "set keepalive error: ", err)
end
end
function _M.connect_mod(self, redis)
-- set operation timeout
redis:set_timeout(self.timeout)
ngx.log(ngx.ERR, "ip: ", self.ip, " port: ", self.port)
local ok, err = redis:connect(self.ip, self.port)
if not ok then
ngx.log(ngx.ERR, "connect to redis error: ", err)
return self:close_redis(redis)
end
if self.password then
local count, err = redis:get_reused_times()
if count == 0 then -- the new connections, need auth
ok, err = redis:auth(self.password)
if not ok then
ngx.log(ngx.ERR, "failed to auth: ", err)
return
end
elseif err then -- failed to get times
ngx.log(ngx.ERR, "failed to get reused times: ", err)
return
end
end
return ok, err
end
function _M.init_pipeline(self)
self._reqs = {}
end
function _M.commit_pipeline(self)
local reqs = self._reqs
if nil == reqs or 0 == #reqs then
return {}, "no pipeline"
else
self._reqs = nil
end
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok then
return {}, err
end
-- redis:select(self.dbidx)
redis:init_pipeline()
for _, vals in ipairs(reqs) do
local fun = redis[vals[1]]
table.remove(vals , 1)
fun(redis, unpack(vals))
end
local results, err = redis:commit_pipeline()
if not results or err then
return {}, err
end
if is_redis_null(results) then
results = {}
ngx.log(ngx.WARN, "result is null")
end
-- table.remove (results , 1)
self:close_redis(redis)
--[[
should we set it to nil here??
--]]
for i, value in ipairs(results) do
if is_redis_null(value) then
results[i] = nil
end
end
return results, err
end
function _M.subscribe(self, channel)
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok then
return {}, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
local function do_read_func(do_read)
if do_read == nil or do_read == true then
res, err = redis:read_reply()
if not res then
return nil, err
end
return res
end
redis:unsubscribe(channel)
self:close_redis(redis)
return
end
return do_read_func
end
local function do_command(self, cmd, ...)
if self._reqs then
table.insert(self._reqs, {cmd, ...}) -- use the pipeline
return
end
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
-- redis:select(self.dbidx)
local fun = redis[cmd]
local result, err = fun(redis, ...)
if not result or err then
-- ngx.log(ngx.ERR, "redis operater result: ", result, " err: ", err)
return nil, err
end
if is_redis_null(result) then
result = nil
end
self:close_redis(redis)
return result, err
end
--[[
add the commands to _M
--]]
for i = 1, #commands do
local cmd = commands[i]
_M[cmd] = function(self, ...)
return do_command(self, cmd, ...)
end
end
function _M.new(self, opts)
opts = opts or {}
local ip = opts.ip or '127.0.0.1'
local port = opts.port or 6379
local password = (opts.password ~= "" and opts.password) or nil
local dbidx = opts.dbidx or 0
local poolsize = opts.poolsize or 20
local timeout = opts.timeout or 1000
local pool_max_idle_time = opts.pool_max_idle_time or 60000
return setmetatable(
{
ip = ip,
port = port,
password = password,
dbidx = dbidx,
poolsize = poolsize,
timeout = timeout,
pool_max_idle_time = pool_max_idle_time
},
mt)
end
return _M
注: 对于hmget等操作,返回的结果是table
参看: