openresty中向kafka推送数据
本文介绍一下openresty中mysql连接池的实现
1. lua-resty-redis
我们可以到lua-resty-kafka官网下载对应的lua模块
2. 实现参考
local producer = require "resty.kafka.producer"
  
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
  new_tab = function (narr, nrec) return {} end
end
local _M = new_tab(0, 20)
_M._VERSION = '0.01'
local mt = {__index = _M}
function _M.send_kafka_async(self, topic, key, message)
  if not _M[self.cluster_name] then 
    
    local p = producer:new(self.broker_list, self.producer_config, self.cluster_name)
    if not p then
      ngx.log(ngx.ERR, "create kafka producer '", self.cluster_name, "' failure")
      
      return false, "create kafka producer failure"
    end   
    
    _M[self.cluster_name] = p
  end   
  
  
  return _M[self.cluster_name]:send(topic, key, message)
  
end 
function _M.send_kafka_sync(self, topic, key, message)
  local p = producer:new(self.broker_list, self.producer_config, self.cluster_name)
  if not p then 
    ngx.log(ngx.ERR, "create kafka producer failure")
    
    return false, "create kafka producer failure"
    
  end 
  
  
  return p:send(topic, key, message)
  
end 
function _M.send_kafka(self, topic, key, message)
  if self.producer_config and self.producer_config.producer_type == "async" then
    return self:send_kafka_async(topic,key, message)
  end
  
 return  self:send_kafka_sync(topic, key, message)
end   
function _M.new(self, broker_list, producer_config, cluster_name)
  broker_list = broker_list or {}
  producer_config = producer_config or {}
  cluster_name = cluster_name or "default_cluster"
  
  return setmetatable(
    {
      broker_list = broker_list,
      producer_config = producer_config,
      cluster_name = cluster_name
    },
    mt)
  
end   
return _M
参看:

