openresty中redis连接池的实现 By ivan.L 发表于 2018-08-03 本文介绍一下openresty中redis连接池的实现 1. lua-resty-redis 我们可以到lua-resty-redis官网下载对应的lua模块 2. 实现参考 local redis_c = require "resty.redis" local ok, new_tab = pcall(require, "table.new") if not ok or type(new_tab) ~= "function" then new_tab = function (narr, nrec) return {} end end local _M = new_tab(0, 155) _M._VERSION = '0.01' local commands = { "append", "auth", "bgrewriteaof", "bgsave", "bitcount", "bitop", "blpop", "brpop", "brpoplpush", "client", "config", "dbsize", "debug", "decr", "decrby", "del", "discard", "dump", "echo", "eval", "exec", "exists", "expire", "expireat", "flushall", "flushdb", "get", "getbit", "getrange", "getset", "hdel", "hexists", "hget", "hgetall", "hincrby", "hincrbyfloat", "hkeys", "hlen", "hmget", "hmset", "hscan", "hset", "hsetnx", "hvals", "incr", "incrby", "incrbyfloat", "info", "keys", "lastsave", "lindex", "linsert", "llen", "lpop", "lpush", "lpushx", "lrange", "lrem", "lset", "ltrim", "mget", "migrate", "monitor", "move", "mset", "msetnx", "multi", "object", "persist", "pexpire", "pexpireat", "ping", "psetex", "psubscribe", "pttl", "publish", --[[ "punsubscribe", ]] "pubsub", "quit", "randomkey", "rename", "renamenx", "restore", "rpop", "rpoplpush", "rpush", "rpushx", "sadd", "save", "scan", "scard", "script", "sdiff", "sdiffstore", "select", "set", "setbit", "setex", "setnx", "setrange", "shutdown", "sinter", "sinterstore", "sismember", "slaveof", "slowlog", "smembers", "smove", "sort", "spop", "srandmember", "srem", "sscan", "strlen", --[[ "subscribe", ]] "sunion", "sunionstore", "sync", "time", "ttl", "type", --[[ "unsubscribe", ]] "unwatch", "watch", "zadd", "zcard", "zcount", "zincrby", "zinterstore", "zrange", "zrangebyscore", "zrank", "zrem", "zremrangebyrank", "zremrangebyscore", "zrevrange", "zrevrangebyscore", "zrevrank", "zscan", "zscore", "zunionstore", "evalsha" } local mt = { __index = _M } local function is_redis_null(res) if type(res) == "table" then for k,v in pairs(res) do if v ~= ngx.null then return false end end return true elseif res == ngx.null then return true elseif res == nil then return true end return false end --[[ this function doesn't really close the redis, but put it into the pool --]] function _M.close_redis(self, redis) if not redis then return end --[[ release the redis pool --]] local pool_max_idle_time = self.pool_max_idle_time local pool_size = self.pool_size local ok, err = redis:set_keepalive(pool_max_idle_time, pool_size) if not ok then ngx.log(ngx.ERR, "set keepalive error: ", err) end end function _M.connect_mod(self, redis) -- set operation timeout redis:set_timeout(self.timeout) ngx.log(ngx.ERR, "ip: ", self.ip, " port: ", self.port) local ok, err = redis:connect(self.ip, self.port) if not ok then ngx.log(ngx.ERR, "connect to redis error: ", err) return self:close_redis(redis) end if self.password then local count, err = redis:get_reused_times() if count == 0 then -- the new connections, need auth ok, err = redis:auth(self.password) if not ok then ngx.log(ngx.ERR, "failed to auth: ", err) return end elseif err then -- failed to get times ngx.log(ngx.ERR, "failed to get reused times: ", err) return end end return ok, err end function _M.init_pipeline(self) self._reqs = {} end function _M.commit_pipeline(self) local reqs = self._reqs if nil == reqs or 0 == #reqs then return {}, "no pipeline" else self._reqs = nil end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok then return {}, err end -- redis:select(self.dbidx) redis:init_pipeline() for _, vals in ipairs(reqs) do local fun = redis[vals[1]] table.remove(vals , 1) fun(redis, unpack(vals)) end local results, err = redis:commit_pipeline() if not results or err then return {}, err end if is_redis_null(results) then results = {} ngx.log(ngx.WARN, "result is null") end -- table.remove (results , 1) self:close_redis(redis) --[[ should we set it to nil here?? --]] for i, value in ipairs(results) do if is_redis_null(value) then results[i] = nil end end return results, err end function _M.subscribe(self, channel) local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok then return {}, err end local res, err = redis:subscribe(channel) if not res then return nil, err end local function do_read_func(do_read) if do_read == nil or do_read == true then res, err = redis:read_reply() if not res then return nil, err end return res end redis:unsubscribe(channel) self:close_redis(redis) return end return do_read_func end local function do_command(self, cmd, ...) if self._reqs then table.insert(self._reqs, {cmd, ...}) -- use the pipeline return end local redis, err = redis_c:new() if not redis then return nil, err end local ok, err = self:connect_mod(redis) if not ok or err then return nil, err end -- redis:select(self.dbidx) local fun = redis[cmd] local result, err = fun(redis, ...) if not result or err then -- ngx.log(ngx.ERR, "redis operater result: ", result, " err: ", err) return nil, err end if is_redis_null(result) then result = nil end self:close_redis(redis) return result, err end --[[ add the commands to _M --]] for i = 1, #commands do local cmd = commands[i] _M[cmd] = function(self, ...) return do_command(self, cmd, ...) end end function _M.new(self, opts) opts = opts or {} local ip = opts.ip or '127.0.0.1' local port = opts.port or 6379 local password = (opts.password ~= "" and opts.password) or nil local dbidx = opts.dbidx or 0 local poolsize = opts.poolsize or 20 local timeout = opts.timeout or 1000 local pool_max_idle_time = opts.pool_max_idle_time or 60000 return setmetatable( { ip = ip, port = port, password = password, dbidx = dbidx, poolsize = poolsize, timeout = timeout, pool_max_idle_time = pool_max_idle_time }, mt) end return _M 注: 对于hmget等操作,返回的结果是table 参看: lua-resty-redis官网 openresty best practise 7openresty中封装redis操作 访问有授权验证的Redis Openresty+Lua Redis连接池实现