using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
namespace CommonUtil.RabbitMQ
{
///
/// RabbitMQ主体
///
public class MQClient
{
///
/// mq默认配置
///
public static readonly MqSettingModel MqSetting = new MqSettingModel();
///
/// mq连接connection
///
public static IConnection Connection;
///
/// mq连接channel
///
public static IModel Channel;
///
/// 发送消息(普通消息,指定交换机类型)
///
/// 消息体
/// 交换机类型
///
public static bool SendMessage(MessageModel message, MQExchangeTypeEnum exchangeType) => SendMessage(null, message, exchangeType);
///
/// 发送消息(普通消息)
///
/// 消息体
///
public static bool SendMessage(MessageModel message) => SendMessage(null, message, MQExchangeTypeEnum.direct);
///
/// 发送消息(指定自定义tcp连接发送消息)
///
/// mq连接对象
/// 消息体
///
public static bool SendMessage(MqSettingModel mqSetting, MessageModel message) => SendMessage(mqSetting, message, MQExchangeTypeEnum.direct);
///
/// 发送消息(指定自定义tcp连接和交换机类型)
///
/// mq连接对象
/// 消息体
/// 交换机类型
///
public static bool SendMessage(MqSettingModel mqSetting, MessageModel message, MQExchangeTypeEnum exchangeType)
{
var result = false;
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
mqSetting = mqSetting==null?MqSetting: mqSetting;
using (var connection = mqSetting.Factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
if (!string.IsNullOrWhiteSpace(message.ExchangeName))
{
channel.ExchangeDeclare(message.ExchangeName, exchangeType.ToString(), true, false, null);
}
if (!string.IsNullOrWhiteSpace(message.QueueName))
{
channel.QueueDeclare(queue: message.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
if (!string.IsNullOrWhiteSpace(message.ExchangeName))
{
channel.QueueBind(message.QueueName, message.ExchangeName, message.RouteKey, null);
}
}
if (message.SendTime != null && message.SendTime > DateTime.Now.AddSeconds(2))
{
TimeSpan ts = Convert.ToDateTime(message.SendTime) - DateTime.Now;
var argument = new Dictionary();
argument.Add("x-dead-letter-exchange", message.ExchangeName);//过期消息转向路由
argument.Add("x-dead-letter-routing-key", message.RouteKey);//过期消息转向路由相匹配routingkey
argument.Add("x-message-ttl", (long)ts.TotalSeconds * 1000);//消息的过期时间
argument.Add("x-expires", ((long)ts.TotalSeconds * 1000)+1000);//队列的过期时间
var queueName = "timing_"+ message.RouteKey+"_"+ TopUtil.GetId().ToString();
channel.QueueDeclare(queueName, true, false, false, argument);
if (!string.IsNullOrWhiteSpace(message.ExchangeName))
{
message.RouteKey = queueName;
channel.QueueBind(queueName, message.ExchangeName, message.RouteKey, null);
}
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.BasicPublish(exchange: message.ExchangeName, routingKey: queueName, basicProperties: properties, body: body);
}
else
{
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.BasicPublish(exchange: message.ExchangeName, routingKey: message.RouteKey, basicProperties: properties, body: body);
}
result = true;
}
}
return result;
}
///
/// 接收并处理消息(指定交换机和交换机类型)
///
/// 委托方法
/// 队列名称
/// 路由
/// 交换机类型
/// 交换机名称
///
public void ReceiveMessage(Func runMethod, string queueName,string routeKey,MQExchangeTypeEnum exchangeType, string exchangeName) => ReceiveMessage(null, runMethod, queueName, routeKey, exchangeType, exchangeName);
///
/// 接收并处理消息
///
/// 委托方法
/// 队列名称
/// 路由
///
public void ReceiveMessage(Func runMethod, string queueName, string routeKey) => ReceiveMessage(null, runMethod, queueName, routeKey, MQExchangeTypeEnum.direct, "");
///
/// 接收并处理消息(指定自定义tcp连接)
///
/// mq连接对象
/// 委托方法
/// 队列名称
/// 路由
///
public void ReceiveMessage(MqSettingModel mqSetting, Func runMethod, string queueName, string routeKey) => ReceiveMessage(mqSetting, runMethod, queueName, routeKey, MQExchangeTypeEnum.direct, "");
///
/// 接收并处理消息(指定自定义tcp连接,并指定交换机和交换机类型)
///
/// mq连接对象
/// 委托方法
/// 队列名称
/// 路由
/// 交换机类型
/// 交换机名称
///
public void ReceiveMessage(MqSettingModel mqSetting, Func runMethod, string queueName, string routeKey, MQExchangeTypeEnum exchangeType, string exchangeName)
{
mqSetting = mqSetting != null ? mqSetting : MqSetting;
Connection = mqSetting!=null?
mqSetting.Factory.CreateConnection():MqSetting.Factory.CreateConnection();
Channel = Connection.CreateModel();
Channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
if (!string.IsNullOrWhiteSpace(exchangeName))
{
Channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), true, false, null);
Channel.QueueBind(queueName, exchangeName, routeKey, null);
}
Channel.BasicQos(0, 1, false);//消费者每次只接收一个消息,并且给Mq回复后才可以接受下一个消息
var consumer = new EventingBasicConsumer(Channel);
consumer.Received += (model, ea) =>
{
var body = Encoding.UTF8.GetString(ea.Body);
var message = JsonConvert.DeserializeObject(body);
var isSuccess = runMethod(message);
if (isSuccess)
{
Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
message.MessageSuccess(queueName, mqSetting.Factory);
}
else
{
if (message.RetryCount >= MqSetting.RetryCount - 1)
{
Channel.BasicReject(ea.DeliveryTag, requeue: false);
message.MessageFail(queueName, mqSetting.Factory);
}
else
{
//消息重新推送
message.RetryMessage(queueName, mqSetting.Factory);
RetryMessage(mqSetting, message);
Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
}
};
Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
///
/// 消息重试机制
///
/// mq连接对象
/// 消息体
///
private static bool RetryMessage(MqSettingModel mqSetting, MessageModel message)
{
var result = false;
var timeDic = new Dictionary();
timeDic.Add(1, 5);
timeDic.Add(2, 60);
timeDic.Add(3, 60*5);
timeDic.Add(4, 60*30);
timeDic.Add(5, 60 * 60);
timeDic.Add(6, 60 * 60*5);
timeDic.Add(7, 60 * 60 * 24);
timeDic.Add(8, 60 * 60 * 72);
timeDic.Add(9, 60 * 60 * 24*15);
timeDic.Add(10, 60 * 60 * 24 * 30);
using (var connection = mqSetting.Factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
var argument = new Dictionary();
argument.Add("x-dead-letter-exchange", message.ExchangeName);//过期消息转向路由
argument.Add("x-dead-letter-routing-key", message.RouteKey);//过期消息转向路由相匹配routingkey
var queueName = "timing_" + message.QueueName;
channel.QueueDeclare(queueName, true, false, false, argument);
if (!string.IsNullOrWhiteSpace(message.ExchangeName))
{
channel.QueueBind(queueName, message.ExchangeName, queueName, null);
}
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
properties.Expiration = (timeDic[message.RetryCount]*1000L).ToString();
channel.BasicPublish(exchange: message.ExchangeName, routingKey: queueName, basicProperties: properties, body: body);
result = true;
}
}
return result;
}
///
/// 消息转发机制(批量)
///
/// 消息体
/// 路由交换机集合
///
public static bool ForwardMessage(MessageModel message, IDictionary routeKeyList)
=> ForwardMessage(MqSetting, message, routeKeyList);
///
/// 消息转发机制(批量,指定mq连接对象)
///
/// mq连接对象
/// 消息体
/// 路由交换机集合
///
public static bool ForwardMessage(MqSettingModel mqSetting, MessageModel message,IDictionary routeKeyList)
{
var result = false;
using (var connection = mqSetting.Factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
foreach (var item in routeKeyList)
{
message.ForwardMessage(item.Key, item.Value, mqSetting.Factory);
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
if (!string.IsNullOrWhiteSpace(item.Key))
{
channel.ExchangeDeclare(item.Key, MQExchangeTypeEnum.direct.ToString(), true, false, null);
}
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.BasicPublish(exchange: item.Key, routingKey: item.Value, basicProperties: properties, body: body);
}
result = true;
}
}
return result;
}
///
/// 消息转发机制(指定mq连接对象)
///
/// 消息体
/// 交换机名称
/// 路由
///
public static bool ForwardMessage(MessageModel message, string exchangeName, string routeKey)
=> ForwardMessage(MqSetting, message, exchangeName, routeKey);
///
/// 消息转发机制
///
/// mq连接对象
/// 消息体
/// 交换机名称
/// 路由
///
public static bool ForwardMessage(MqSettingModel mqSetting, MessageModel message,string exchangeName,string routeKey)
{
var result = false;
using (var connection = mqSetting.Factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
message.ForwardMessage(exchangeName, routeKey, mqSetting.Factory);
var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
if (!string.IsNullOrWhiteSpace(exchangeName))
{
channel.ExchangeDeclare(exchangeName, MQExchangeTypeEnum.direct.ToString(), true, false, null);
}
IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: properties, body: body);
result = true;
}
}
return result;
}
}
}