openresty中向kafka推送数据
本文介绍一下openresty中mysql连接池的实现
1. lua-resty-redis
我们可以到lua-resty-kafka官网下载对应的lua模块
2. 实现参考
local producer = require "resty.kafka.producer"
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end
local _M = new_tab(0, 20)
_M._VERSION = '0.01'
local mt = {__index = _M}
function _M.send_kafka_async(self, topic, key, message)
if not _M[self.cluster_name] then
local p = producer:new(self.broker_list, self.producer_config, self.cluster_name)
if not p then
ngx.log(ngx.ERR, "create kafka producer '", self.cluster_name, "' failure")
return false, "create kafka producer failure"
end
_M[self.cluster_name] = p
end
return _M[self.cluster_name]:send(topic, key, message)
end
function _M.send_kafka_sync(self, topic, key, message)
local p = producer:new(self.broker_list, self.producer_config, self.cluster_name)
if not p then
ngx.log(ngx.ERR, "create kafka producer failure")
return false, "create kafka producer failure"
end
return p:send(topic, key, message)
end
function _M.send_kafka(self, topic, key, message)
if self.producer_config and self.producer_config.producer_type == "async" then
return self:send_kafka_async(topic,key, message)
end
return self:send_kafka_sync(topic, key, message)
end
function _M.new(self, broker_list, producer_config, cluster_name)
broker_list = broker_list or {}
producer_config = producer_config or {}
cluster_name = cluster_name or "default_cluster"
return setmetatable(
{
broker_list = broker_list,
producer_config = producer_config,
cluster_name = cluster_name
},
mt)
end
return _M
参看: