1987WEB视界-分享互联网热点话题和事件

您现在的位置是:首页 > WEB开发 > 正文

WEB开发

支持插件的消息中间件【msg broker with plugin】 知然 博客园

1987web2024-03-26WEB开发40
支持插件的消息中间件【msgbrokerwithplugin】-知然-博客园支持插件的消息中间件【msgbrokerwithplugin】支持插件的消息

支持插件的消息中间件【msg broker with plugin】 - 知然 - 博客园支持插件的消息中间件【msg broker with plugin】支持插件的消息中间件msg broker

支持插件的消息中间件【msg broker with plugin】

支持插件的消息中间件

msg broker with plugin

Msg Broker概念:

msg broker是实现application之间互通讯的组件。通常为实现application之间的解耦,消息都是通过msg broker完成转发。application只需知道其他applicatipn的逻辑名称,而不需要知道对方的具体位置。Broker中维护一个查找表,记录着哪个application注册在此逻辑名称之下,所以消息总是会被正确的投递到目的地。

msg broker不限于1-1的转发,也支持1-N的模式。其主要功能有:

实现多个application的互通讯,而隐藏彼此的位置实现消息个格式的转换,如json to bin安全控制,msg broker可以再转发消息前进行一定程度的安全验证增大系统的可伸缩性,由于application通讯的目标变成了逻辑结点,而该逻辑结点可以对应多个物理结点,理论上可以动态的增加物理结点,来扩展该逻辑结点的吞吐量。msg broker可以用来集成服务,并且可以暴楼服务的部分接口

msg broker 具有的缺点是:

增加了复杂性,多了一层转发可维护性降低,需要理清msg broker和各个application和服务的关系。降低性能,主要是实时性能下降了,消息需要多转发一边,单次请求的延时大大增加了。

当前流行的Broker的特点和缺点:

Msg Broker的结构:

流行的Broker中间件介绍:

RabbitMQ

项目地址:http://www.rabbitmq.com/

RabbitMQ是由Erlang开发的以高效、健壮以及高度伸缩性的消息服务器。其所包含的概念有Producer、Consumer、Exchange、Queue。RabbitMQ基于QMQP协议,支持的语言也非常丰富,文档也非常清晰。使用RabbitMQ可以实现订阅发布模型、RPC模型、路由模型等,参见RabbitMQ的例子:http://www.rabbitmq.com/getstarted.html

但是它有如下局限性:

RabbitMQ 没有针对连接做控制,它是为高效而生,它对外来的请求是信任的,不存在安全验证,如任何一个client都可以创建消息队列,所以RabbitMQ一定是放在内网的。使用RabbitMQ ,我们是通过Client远程操作RabbitMQ,不能定制RabbitMQ的功能。ZeroMQ

项目地址:http://www.zeromq.org/

ZeroMQ是一个Socket封装库,号称是最快的消息内核。ZeroMQ可以支持TCP、UDP、IPC等多种通讯协议。ZeroMQ可以实现的通讯模型就更多了,几乎涵盖了消息通讯的所有模式,参见官网介绍http://www.zeromq.org/intro:read-the-manual

其局限性为:

ZeroMQ虽然封装了消息传输的复杂性,但是它也隐藏了连接的建立、断开等过程。ZeroMQ传输消息更像是udp数据报,使用者不能知道对方何时连接建立、何时连接断开。

我们需要一个不一样的Broker

应用场景介绍

在网络游戏中,cliet和服务器是通过tcp长连接的。相对于HTTP+WebServer的不同在于:

client连接到服务器,需要进行身份验证,通常是client第一个消息包含身份验证数据如用户名密码等,而验证通过后该连接为可信任连接。client 任意时间都可以向服务器发送请求,而不需要服务器立即返回,同样,服务器是在任意时间(当然会有实时性等约束)都可以像client推送消息。client断开连接时,服务器必须捕获该事件,以便完成一些数据清理操作。client对应的一般是个集群,但是client无从得知细节,因为它只连接最外层的一个,给他取个名字“MsgBroker”。Msg Broker 不许有一定的安全控制,如心跳、网络包频率限制等,防范某些可能的攻击。Msg Broker需要高度可定制。不同的游戏主要是逻辑不同,而MsgBroker大多大同小异。当然MsgBroker总是会根据需求稍作修改。Msg Broker 主要瓶颈是IO操作,因为它涉及大量的网络连接、断开、心跳、广播消息等。而它具有的领域逻辑则非常非常少。所以Msg Broker的逻辑可以使用动态脚本实现,其实时性、效率都能满足要求。

需要的broker具有的功能:

能够捕获client连接事件能够捕获client断开事件具有网络心跳功能方便的消息发送接口broker可以以client的角色连接到其他Server,因为从其他逻辑角度看,Broker可能是其他服务的使用者。Broker 提供消息收发框架,逻辑层通过插件实现。实现插件的方式有动态链接库,可以将逻辑层封装到so链接库中python脚本,逻辑层可以有python脚本实现,Broker封装了载入python、调用python,封装消息发送接口到PyhtonLua脚本,逻辑层也可以又Lua脚本实现,Broker封装了载入lua、调用lua、封装消息接口给lua。

Msg Broker 结构图

Msg Broker 的安装使用:

安装依赖库:

由于msg broker支持Python和lua作为插件,那么必须确保linux下安装了相应的头文件。示例中的插件均只实现了echo功能。

确保Linux系统安装了Python,推荐python2.6确保安装了Python-devel,如果是centos,直接yum即可。确保安装了Lua-5.1.4, 其他版本没有测试过下载Msg Broker最新源码,目前处于0.1版本

svn cohttps://ffown.googlecode.com/svn/trunk/

编译源码:cd trunk/example/plugin_msg_broker/make编译动态连接库插件cd plugin/plugin_echo_dll/sh gen_dll.sh

运行示例插件:

运行动态链接库./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_dll/libechoso另开终端,telent 127.0.01 10241, 收入5 回车,再输入5个字符,通讯协议是body长度加回车加body,如图:

运行Python 脚本示例程序./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_py/echo.py同样使用telnet 测试echo功能运行Lua脚本示例程序./msg_broker_server 127.0.0.1 10241 plugin/plugin_echo_lua/lua.py同样使用telnet 测试echo功能

插件层设计分析:

插件接口:

#ifndef_PLUGIN_H_#define_PLUGIN_H_#include"channel.h"#include"message.h"classplugin_i{public:virtual~plugin_i(){}virtualintstart()=0;virtualintstop()=0;virtualinthandle_broken(channel_ptr_tchannel_)=0;virtualinthandle_msg(constmessage_t&msg_,channel_ptr_tchannel_)=0;};typedefplugin_i*plugin_ptr_t;typedefint(*handle_channel_msg_func_t)(constmessage_t&msg_,channel_ptr_t);typedefint(*handle_channel_broken_func_t)(channel_ptr_t);#defineHANDLE_CHANNEL_MSG"handle_channel_msg"#defineHANDLE_CHANNEL_BROKEN"handle_channel_broken"#endif

各个接口作用如下:

start 实现插件载入,环境初始化stop实现优雅的退出handle msg 为消息到来通知handle_broken 为对方连接关闭

Channel 设计

channel 用来表示一个连接,可以理解成socket的抽象,也可直接理解成远程client。

#ifndef_CHANNEL_H_#define_CHANNEL_H_#include"socket_i.h"classchannel_t{public:channel_t(socket_ptr_tsock_);~channel_t();voidset_data(void*p);void*get_data()const;template<typenameT>T*get_data()const{return(T*)this->get_data();}voidasync_send(conststring&buff_);voidclose();private:socket_ptr_tm_socket;void*m_data;};typedefchannel_t*channel_ptr_t;#endif

各个接口作用如下:

构造,channel必须绑定一个socketset_data get_data用来操作channel私有数据,如我们可以在channel私有数据中存放该channel对应的uid,这样每个channel之需验证一次,以后自然知道到来的消息属于哪个channel。async_send 异步发送消息close 关闭连接

动态链接库插件:

流程如下:

载入动态库获取动态库的接口,记录函数指针地址若有msg到来,调用动态链接库的handle_msg若连接关闭,调用动态链接库的handl_broken
intplugin_dll_t::start(){m_dll_handler=::dlopen(m_dll_name.c_str(),RTLD_NOW|RTLD_GLOBAL);if(NULL==m_dll_handler){logerror((PLUGIN_IMPL,"plugin_dll_t::start dlopen failed:<%s>",dlerror()));return-1;}m_msg_cb=(handle_channel_msg_func_t)::dlsym(m_dll_handler,HANDLE_CHANNEL_MSG);m_broken_cb=(handle_channel_broken_func_t)::dlsym(m_dll_handler,HANDLE_CHANNEL_BROKEN);if(NULL==m_msg_cb){logerror((PLUGIN_IMPL,"plugin_dll_t::start dlopen failed:<%s> not exist",HANDLE_CHANNEL_MSG));return-1;}if(NULL==m_broken_cb){logerror((PLUGIN_IMPL,"plugin_dll_t::start dlopen failed:<%s> not exist",HANDLE_CHANNEL_BROKEN));return-1;}return0;}intplugin_dll_t::stop(){::dlclose(m_dll_handler);return0;}intplugin_dll_t::handle_broken(channel_ptr_tchannel_){returnm_broken_cb(channel_);}intplugin_dll_t::handle_msg(constmessage_t&msg_,channel_ptr_tchannel_){returnm_msg_cb(msg_,channel_);}

Python 插件

其工作流程如下:

初始化Python解释权,将封装的发送消息接口注册到Python虚拟机中设置PythonPath载入python文件若msg到来,调用python全局函数handle_msg若channel断开,调用Python 全局handle_broken 函数
#include"plugin_impl/plugin_python.h"#include"plugin_impl/pyext.h"#include"log_module.h"plugin_python_t::plugin_python_t(conststring&name_):m_py_mod(NULL){stringpythonpath="./";intpos=name_.find_last_of(/);if(-1==pos){m_py_name=name_;}else{m_py_name=name_.substr(pos+1);pythonpath=name_.substr(0,pos+1);}pos=m_py_name.find_first_of(.);m_py_name=m_py_name.substr(0,pos);Py_InitializeEx(0);Py_SetPythonHome((char*)pythonpath.c_str());initpyext(this);PyRun_SimpleString("import channel;import sys;sys.path.append(./plugin/plugin_echo_py/)");}plugin_python_t::~plugin_python_t(){Py_Finalize();}intplugin_python_t::start(){if(load_py_mod()){return-1;}return0;}intplugin_python_t::stop(){return0;}intplugin_python_t::load_py_mod(){PyObject*pName,*pModule;pName=PyString_FromString(m_py_name.c_str());pModule=PyImport_Import(pName);if(!pModule){Py_DECREF(pName);logerror((PLUGIN_IMPL,"cant find %s.py\n",m_py_name.c_str()));if(PyErr_Occurred()){PyErr_Print();PyErr_Clear();return-1;}return-1;}m_py_mod=PyModule_GetDict(pModule);Py_DECREF(pName);Py_DECREF(pModule);return0;}intplugin_python_t::handle_broken(channel_ptr_tchannel_){m_channel_mgr.erase(long(channel_));deletechannel_;returncall_py_handle_broken(long(channel_));}intplugin_python_t::handle_msg(constmessage_t&msg_,channel_ptr_tchannel_){m_channel_mgr.insert(make_pair((long)channel_,channel_));returncall_py_handle_msg((long)channel_,msg_.get_body().c_str());}intplugin_python_t::call_py_handle_msg(longval,constchar*msg){PyObject*pDict=m_py_mod;constchar*func_name="handle_msg";PyObject*pFunc,*arglist,*pRetVal;pFunc=PyDict_GetItemString(pDict,func_name);if(!pFunc||!PyCallable_Check(pFunc)){logerror((PLUGIN_IMPL,"cant find function [%s]\n",func_name));return-1;}arglist=Py_BuildValue("ls",val,msg);pRetVal=PyObject_CallObject(pFunc,arglist);Py_DECREF(arglist);if(pRetVal){Py_DECREF(pRetVal);}if(PyErr_Occurred()){PyErr_Print();PyErr_Clear();return-1;}return0;}intplugin_python_t::call_py_handle_broken(longval){PyObject*pDict=m_py_mod;constchar*func_name="handle_broken";PyObject*pFunc,*arglist,*pRetVal;pFunc=PyDict_GetItemString(pDict,func_name);if(!pFunc||!PyCallable_Check(pFunc)){logerror((PLUGIN_IMPL,"cant find function [%s]\n",func_name));return-1;}arglist=Py_BuildValue("l",val);pRetVal=PyObject_CallObject(pFunc,arglist);Py_DECREF(arglist);if(pRetVal){Py_DECREF(pRetVal);}if(PyErr_Occurred()){PyErr_Print();PyErr_Clear();return-1;}return0;}channel_ptr_tplugin_python_t::get_channel(longp){map<long,channel_ptr_t>::iterator it=m_channel_mgr.find(p);if(it!=m_channel_mgr.end()){returnit->second;}returnNULL;}

Lua 插件:

工作流程如下:

初始化lua虚拟机注册发送消息接口给lua载入Lua脚本有msg到来,调用lua的hanle_msg接口有channel断开,调用lua的handle_broken接口
staticplugin_lua_t*g_plugin_lua_obj=NULL;staticintchannel_send_msg(lua_State*ls_){longptr=(long)luaL_checknumber(ls_,1);size_tlen=0;constchar*msg=luaL_checklstring(ls_,2,&len);channel_ptr_tc=g_plugin_lua_obj->get_channel(ptr);if(c){c->async_send(msg);}return0;}plugin_lua_t::plugin_lua_t(conststring&name_):m_ls(NULL){g_plugin_lua_obj=this;stringluapath="./";intpos=name_.find_last_of(/);if(-1==pos){m_lua_name=name_;}else{m_lua_name=name_.substr(pos+1);luapath=name_.substr(0,pos+1);}pos=m_lua_name.find_first_of(.);m_lua_name=m_lua_name.substr(0,pos);m_ls=lua_open();lua_checkstack(m_ls,20);lua_pushcfunction(m_ls,channel_send_msg);lua_setglobal(m_ls,"_tmp_func_");luaL_dostring(m_ls,"channel = {} channel.send = _tmp_func_ _tmp_func_ = nil");stringlua_str="package.path = package.path .. \""+luapath+"?.lua\"";luaL_openlibs(m_ls);if(luaL_dostring(m_ls,lua_str.c_str())){lua_pop(m_ls,1);}m_lua_name=name_;}plugin_lua_t::~plugin_lua_t(){}intplugin_lua_t::start(){if(load_lua_mod()){logerror((PLUGIN_IMPL,"cant find %s.lua\n",m_lua_name.c_str()));return-1;}return0;}intplugin_lua_t::stop(){return0;}intplugin_lua_t::handle_broken(channel_ptr_tchannel_){m_channel_mgr.erase(long(channel_));deletechannel_;returncall_lua_handle_broken(long(channel_));}intplugin_lua_t::handle_msg(constmessage_t&msg_,channel_ptr_tchannel_){m_channel_mgr.insert(make_pair((long)channel_,channel_));returncall_lua_handle_msg((long)channel_,msg_.get_body());}intplugin_lua_t::load_lua_mod(){if(luaL_dofile(m_ls,m_lua_name.c_str())){lua_pop(m_ls,1);return-1;}return0;}intplugin_lua_t::call_lua_handle_msg(longval,conststring&msg){lua_checkstack(m_ls,20);lua_getglobal(m_ls,"handle_msg");lua_pushnumber(m_ls,val);lua_pushlstring(m_ls,msg.c_str(),msg.size());if(lua_pcall(m_ls,2,0,0)!=0){lua_pop(m_ls,1);return-1;}return0;}intplugin_lua_t::call_lua_handle_broken(longval){lua_checkstack(m_ls,20);lua_getglobal(m_ls,"handle_broken");lua_pushnumber(m_ls,val);if(lua_pcall(m_ls,1,0,0)!=0){lua_pop(m_ls,1);return-1;}return0;}channel_ptr_tplugin_lua_t::get_channel(longp){map<long,channel_ptr_t>::iterator it=m_channel_mgr.find(p);if(it!=m_channel_mgr.end()){returnit->second;}returnNULL;}

msg_broker 待完善的地方:

心跳层还未加入插件层报错不够友好Python 中封装的channel使用long型,调用send接口时需要从long转化到channel,需要优化一下,直接封装一个channel对象到PythonLua中channel的封装暂时也是使用long来表示,具有和上面一样的性能损耗问题