OpenResty连接Redis

通常来说,对于OpenResty的应用多数都会用到Redis,用以减少回源次数,从而加速网站。话不多说,先上一个demo

redis_iresty.lua

-- file name: resty/redis_iresty.lua
local redis_c = require "resty.redis"


local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
    new_tab = function (narr, nrec) return {} end
end


local _M = new_tab(0, 155)
_M._VERSION = '0.01'


local commands = {
    "append",   "bgsave",       "blpop",        "brpoplpush",   "auth",         "bitcount",
    "brpop",    "client",       "bgrewriteaof", "bitop",        "config",       "dbsize",
    "debug",    "decr",         "del",          "discard",      "echo",         "eval",
    "exec",     "expire",       "expireat",     "flushdb",      "get",          "getrange",
    "getset",   "hexists",      "hget",         "hincrby",      "hincrbyfloat", "hlen",
    "hmget",    "hset",         "hsetnx",       "incrby",       "keys",         "lastsave",
    "llen",     "lpushx",       "lset",         "migrate",      "monitor",      "msetnx",
    "persist",  "ping",         "pttl",         "publish",      "quit",         "randomkey",
    "restore",  "rpop",         "rpushx",       "scan",         "sdiff",        "select",
    "setex",    "shutdown",     "sismember",    "smembers",     "spop",         "sscan",
    "strlen",   "sunionstore",  "ttl",          "type",         "watch",        "zcount",
    "zrange",   "zrem",         "zrevrange",    "zscan",        "zscore",       "hmset",
    "hvals",    "incrbyfloat",  "lindex",       "lpop",         "lrange",       "ltrim",
    "move",     "multi",        "pexpire",      "psetex",       --[[ "punsubscribe", ]]
    "rename",   "info",         "linsert",      "lpush",        "lrem",         "mget",
    "mset",     "object",       "pexpireat",    "psubscribe",   "pubsub",       "renamenx",
    "rpush",    "save",         "script",       "setbit",       "setrange",     "sinterstore",
    "slowlog",  "sort",         "srem",         "sunion",       "time",         "rpoplpush",
    "sadd",     "scard",        "sdiffstore",   "set",          "setnx",        "sinter",
    "slaveof",  "smove",        "srandmember",  --[[ "subscribe",  ]]           "sync",
    --[[ "unsubscribe", ]]      "zunionstore",  "evalsha",      "decrby",       "dump",
    "exists",   "flushall",     "getbit",       "hdel",         "hgetall",      "hkeys",
    "hscan",    "incr",         "zadd",         "zincrby",      "zrangebyscore","zrank",
    "zremrangebyrank",          "zremrangebyscore",             "zrevrangebyscore", 
    "zrevrank", "unwatch",      "zcard",        "zinterstore"
 }


local mt = { __index = _M }


local function is_redis_null( res )
    if type(res) == "table" then
        for k,v in pairs(res) do
            if v ~= ngx.null then
                return false
            end 
        end
        return true
    elseif res == ngx.null then
        return true
    elseif res == nil then
        return true
    end

    return false
end

-- change connect address as you need
function _M.connect_mod( self, redis )
    redis:set_timeout(self.timeout)
    return redis:connect("127.0.0.1", 6379)
end

function _M.set_keepalive_mod( redis )
    -- put it into the connection pool of size 100, with 60 seconds max idle time
    return redis:set_keepalive(60000, 1000)
end

function _M.init_pipeline( self )
    self._reqs = {}
end

function _M.commit_pipeline( self )
    local reqs = self._reqs

    if nil == reqs or 0 == #reqs then
        return {}, "no pipeline"
    else
        self._reqs = nil
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok then
        return {}, err
    end

    redis:init_pipeline()
    for _, vals in ipairs(reqs) do
        local fun = redis[vals[1]]
        table.remove(vals , 1)
        fun(redis, unpack(vals))
    end

    local results, err = redis:commit_pipeline()
    if not results or err then
        return {}, err
    end

    if is_redis_null(results) then
        results = {}
        ngx.log(ngx.WARN, "is null")
    end
    -- table.remove (results , 1)

    self.set_keepalive_mod(redis)

    for i,value in ipairs(results) do
        if is_redis_null(value) then
            results[i] = nil
        end
    end
    return results, err
end

function _M.subscribe( self, channel )
    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local res, err = redis:subscribe(channel)
    if not res then
        return nil, err
    end

    local function do_read_func ( do_read )
        if do_read == nil or do_read == true then
            res, err = redis:read_reply()
            if not res then
                return nil, err
            end
            return res
        end
        redis:unsubscribe(channel)
        self.set_keepalive_mod(redis)
        return
    end

    return do_read_func
end


local function do_command(self, cmd, ... )
    if self._reqs then
        table.insert(self._reqs, {cmd, ...})
        return 
    end

    local redis, err = redis_c:new()
    if not redis then
        return nil, err
    end

    local ok, err = self:connect_mod(redis)
    if not ok or err then
        return nil, err
    end

    local fun = redis[cmd]
    local result, err = fun(redis, ...)
    if not result or err then
        return nil, err
    end

    if is_redis_null(result) then
        result = nil
    end

    self.set_keepalive_mod(redis)

    return result, err
end


for i = 1, #commands do
    local cmd = commands[i]
    _M[cmd] =
            function (self, ...)
                return do_command(self, cmd, ...)
            end
end

 
function _M.new(self, opts)
    opts = opts or {}
    local timeout = (opts.timeout and opts.timeout * 1000) or 1000  --1s
    local db_index= opts.db_index or 0
    return setmetatable({
            timeout = timeout,
            db_index = db_index,
            _reqs = nil }, mt)
end


return _M

上述代码片段摘自OpenResty 最佳实践,上面的模块是对resty.redis的包装。其中抽象了连接创建、认证及释放的过程,并且对外统一为池化操作。下面为一个调用的demo:

local redis = require "resty.redis_iresty"
local red = redis:new()

local ok, err = red:set("dog", "an animal")
if not ok then
    ngx.say("failed to set dog: ", err)
		return 
end

ngx.say("set result: ", ok)

这里调用的代码很简单,就不再多说了,下面说一下上面resty_iredis.lua的内容。

普通命令

第一个调用的是new(opts),入参是一个table,可以设置两个值,timeout和db索引值。(其他相关连接参数在工具内部,实际使用时可以抽出来配置文件)如timeout不设置,则默认超时为1s。

new的时候返回的是iresty对象本身,因此new上面对基本上所有的redis命令进行了重写。并最终由do_command方法去调用resty.redis的方法,实现了与原生resty.redis相同的API。

还有一点需要提及的就是,上述API的封装是用到连接池的(虽然API中不能明显看出)。在new的时候并没有到连接池中获取连接,而是在具体的命令调用时(do_command方法中connect_mod)才去到cosocket连接池中去获取TCP连接的。cosocket中的连接池是以IP和端口号作为连接池的KEY,redis:connect(ip, port)才可以获取对应的TCP连接。

批量操作

对于批量操作,上述工具里面主要包含两个接口:init_pipeline()和commit_pipeline()。当批量命令开始时,执行init_pipeline(),此时会初始化一个命令缓冲区_reqs,用以存储待执行的批量命令。最后调用_M:set_keepalive_mod中的redis:set_keepalive(60000, 1000)方法,用以释放连接到池中。若此时没有以此IP和端口号为KEY的连接池,则会创建一个大小为1000的连接池,最大存活时间为60s。

订阅服务

对于订阅服务,调用代码请参见如下:

local red = redis:new({timeout=1000})
local func = red:subscribe( "channel" )
if not func then
  return nil
end

while true do
    local res, err = func()
    if err then
        func(false)
    end
	... ...
end

return cbfunc