对 rabbitmq-c 进行封装, 支持多线程, 高并发访问

 

头文件地址:

https://github.com/MwlLj/cpp_component/tree/master/component/header/amqp
<https://github.com/MwlLj/cpp_component/tree/master/component/header/amqp>

源文件地址:

https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/source
<https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/source>

测试地址:


https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/amqp_test/source

<https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/amqp_test/source>

完整工程:

https://github.com/MwlLj/cpp_component <https://github.com/MwlLj/cpp_component>
 

支持功能:

1. 断线重连机制

2. 高并发处理

3. 处理类中是一个线程, 可以直接在handler中做耗时操作

 

example:
#include <iostream> #include "amqp/amqp.h" #include "amqp/rabbitmq.h" #include
<vector> #include <string> #include <sstream> #include <thread> class CHandler
: public amqp::IHandler { public: virtual void handle(const amqp::CMessage
&msg) { std::cout << msg.msg() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } }; void
connect(amqp::IAmqp *amqp, int index) { bool result = false; amqp::IAmqp::Dial
info; info.host = "localhost"; info.port = 5672; info.userName = "guest";
info.userPwd = "guest"; info.virtualUrl = "/"; amqp::IConnect *conn =
amqp->dial(info, result); amqp::IConnect::Channel ch; ch.channelCount = 1;
amqp::IChannel *channel = conn->channel(ch, result); amqp::IChannel::Exchange
exchange; exchange.exchangeName = "test-exchange"; exchange.exchangeType =
amqp::exchage_type_direct; channel->exchangeDeclare(exchange);
amqp::IChannel::Publish pub; pub.exchangeName = "test-exchange";
pub.exchangeType = amqp::exchage_type_direct; pub.message = "hello hello
hello"; pub.routingKey = "test-key"; #if 0 while (1) { for (int i = 0; i < 10;
++i) { std::thread([channel, pub, i] {
std::this_thread::sleep_for(std::chrono::milliseconds(10 * (10 - i)));
channel->publish(pub); }).detach(); }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); } #else
std::stringstream ss; ss << "test-queue-" << index; std::string queueName =
ss.str(); ss.str(""); ss << "test-key-" << index; std::string routingKey =
ss.str(); std::cout << routingKey << std::endl; amqp::IChannel::Queue que;
que.queueName = queueName; que.isAutoDelete = true; que.isAutoDelWhenDisconnect
= true; que.isNotAllowNotExist = false; que.isPersist = false;
channel->queueDeclare(que); amqp::IChannel::QueueBind queBind;
queBind.exchangeName = "test-exchange"; queBind.routingKey = routingKey;
queBind.queueName = queueName; channel->queueBind(queBind);
amqp->pushChannel(channel); amqp->registerRouting(routingKey, new CHandler());
#endif // conn->close(); } int main() { const int max = 10; amqp::IAmqp *amqp =
new amqp::CRabbitmq(200, 100); for (int i = 0; i < max; ++i) {
std::thread([amqp, i] { connect(amqp, i); }).join(); } amqp->loop();
std::system("pause"); return 0; }
 

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信