重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。
创新互联主营神池网站建设的网络公司,主营网站建设方案,app软件开发公司,神池h5小程序制作搭建,神池网站营销推广欢迎神池等地区企业咨询RabbitMQ的官方
操作起来很简单,只需要在DOS下面,进入安装目录(安装路径RabbitMQ Serverabbitmq_server-3.2.2sbin
)执行如下命令就可以成功安装。
rabbitmq-plugins enable rabbitmq_management
可以通过访问:http://localhost:15672
进行测试,默认的登陆账号为:guest,密码为:guest。
1. 安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。
set ERLANG_HOME=F:Program Fileserl9.0
#环境变量`path`里加入:%ERLANG_HOME%in
#环境变量`path`里加入: 安装路径RabbitMQ Serverabbitmq_server-3.6.10sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。
rabbitmq-plugins.bat enable rabbitmq_management
3.创建管理用户
rabbitmqctl.bat add_user sa 123456
4. 设置管理员
rabbitmqctl.bat set_user_tags sa administrator
5.设置权限
rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*"
6. 其他命令
#查询用户:
rabbitmqctl.bat list_users
#查询vhosts:
rabbitmqctl.bat list_vhosts
#启动RabbitMQ服务:
net stop RabbitMQ && net start RabbitMQ
以上这些,账号、vhost、权限、作用域等基本就设置完了。
RabbitMQ.Client 是RabbiMQ 官方提供的的客户端
EasyNetQ 是基于RabbitMQ.Client 基础上封装的开源客户端,使用非常方便
以下操作RabbitMQ的代码例子,都是基于EasyNetQ的使用和再封装,在文章底部有demo例子的×××地址
创建 IBus
///
/// 消息服务器连接器
///
public class BusBuilder {
public static IBus CreateMessageBus() {
// 消息服务器连接字符串
// var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];
string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456";
if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty");
return RabbitHutch.CreateBus(connString);
}
}
Fanout Exchange
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。
Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。
所以,Fanout Exchange 转发消息是最快的。
///
/// 消息消耗(fanout)
///
/// 消息类型
/// 回调
/// 交换器名
/// 队列名
/// 路由名
public static void FanoutConsume(Action handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout);
var queue = CreateQueue(adbus, queueName);
adbus.Bind(exchange, queue, routingKey);
adbus.Consume(queue, registration => {
registration.Add((message, info) => {
handler(message.Body);
});
});
}
///
/// 消息上报(fanout)
///
/// 消息类型
/// 主题名
/// 消息命名
/// 错误信息
///
public static bool FanoutPush(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout);
adbus.Publish(exchange, routingKey, false, new Message(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
///
/// 消息发送(direct)
///
/// 消息类型
/// 发送到的队列
/// 发送内容
public static void DirectSend(string queue, T message) where T : class {
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Send(queue, message);
}
}
///
/// 消息接收(direct)
///
/// 消息类型
/// 接收的队列
/// 回调操作
/// 错误信息
///
public static bool DirectReceive(string queue, Action callback, out string msg) where T : class {
msg = string.Empty;
try {
var bus = BusBuilder.CreateMessageBus();
bus.Receive(queue, callback);
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
return true;
}
///
/// 消息发送
///
///
///
///
///
///
///
///
public static bool DirectPush(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct);
adbus.Publish(exchange, routingKey, false, new Message(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
///
/// 消息接收
///
///
/// 消息类型
/// 回调
/// 交换器名
/// 队列名
/// 路由名
public static bool DirectConsume(Action handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default") where T : class {
msg = string.Empty;
try {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct);
var queue = CreateQueue(adbus, queueName);
adbus.Bind(exchange, queue, routingKey);
adbus.Consume(queue, registration => {
registration.Add((message, info) => {
handler(message.Body);
});
});
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
return true;
}
Topic Exchange
要使用主题发布,只需使用带有主题的重载的Publish方法:
var bus = RabbitHutch.CreateBus(...);
bus.Publish(message, "X.A");
订阅者可以通过指定要匹配的主题来过滤邮件。
所以发布的主题为“X.A.2”的消息将匹配“#”,“X.#”,“ .A.”,而不是“X.B. *”或“A”。
警告,Publish只顾发送消息到队列,但是不管有没有消费端订阅,所以,发布之后,如果没有消费者,该消息将不会被消费甚至丢失。
EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*"));
警告: 具有相同订阅者但不同主题字符串的两个单独订阅可能不会产生您期望的效果。 subscriberId有效地标识个体AMQP队列。 具有相同subscriptionId的两个订阅者将连接到相同的队列,并且两者都将添加自己的主题绑定。 所以,例如,如果你这样做:
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
匹配“x.”或“ .B”的所有消息将被传递到“XXX_my_id”队列。 然后,RabbitMQ将向两个消费者传递消息,其中handlerOfXDotStar和handlerOfStarDotB轮流获取每条消息。
现在,如果你想要匹配多个主题(“X. ”OR“ .B”),你可以使用另一个重载的订阅方法,它采用多个主题,如下所示:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
///
/// 获取主题
///
/// 主题内容类型
/// 订阅者ID
/// 消息接收响应回调
/// 订阅主题集合
public static void TopicSubscribe(string subscriptionId, Action callback, params string[] topics) where T : class {
var bus = BusBuilder.CreateMessageBus();
bus.Subscribe(subscriptionId, callback, (config) => {
foreach (var item in topics) config.WithTopic(item);
});
}
///
/// 发布主题
///
/// 主题内容类型
/// 主题名称
/// 主题内容
/// 错误信息
///
public static bool TopicPublish(string topic, T message, out string msg) where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Publish(message, topic);
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
///
/// 发布主题
///
///
/// 消息类型
/// 消息内容
/// 主题名
/// 错误信息
/// 交换器名
///
public static bool TopicSub(T t, string topic, out string msg, string exChangeName = "topic_mq") where T : class {
msg = string.Empty;
try {
if (string.IsNullOrWhiteSpace(topic)) throw new Exception("推送主题不能为空");
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
//var queue = adbus.QueueDeclare("user.notice.zhangsan");
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic);
adbus.Publish(exchange, topic, false, new Message(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
///
/// 获取主题
///
///
/// 消息类型
/// 订阅者ID
/// 回调
/// 交换器名
/// 主题名
public static void TopicConsume(Action callback, string exChangeName = "topic_mq",string subscriptionId = "topic_subid", params string[] topics) where T : class {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic);
var queue = adbus.QueueDeclare(subscriptionId);
foreach (var item in topics) adbus.Bind(exchange, queue, item);
adbus.Consume(queue, registration => {
registration.Add((message, info) => {
callback(message.Body);
});
});
}
具体发布/订阅消息的Demo和相关测试看源码Demo
当在创建订阅者去消费队列的时候
///
/// 获取主题
///
///
public static void GetSub(T topic, Action callback) where T : class
{
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Subscribe(topic.ToString(), callback, x => x.WithTopic(topic.ToString()));
}
}
using里的对象在执行完成后被回收了,导致刚连接上去就又断开了(刚开始写的时候,习惯性加using,排查了好久才发现,欲哭无泪)
源码项目运行前的准备与确认:
到RabbitMQ管理后台添加TestQueue
VHost,并且分配用户权限,然后到RabbitMQHelper.BusBuilder
类里配置RabbitMQ连接服务的相关信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456
,(根据配置的内容和用户修改)
参考资料(鸣谢):
附:Demo源码GitHub地址
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。