using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace CommonUtil.RabbitMQ { /// /// RabbitMQ主体 /// public class MQClient { /// /// mq默认配置 /// public static readonly MqSettingModel MqSetting = new MqSettingModel(); /// /// mq连接connection /// public static IConnection Connection; /// /// mq连接channel /// public static IModel Channel; /// /// 发送消息(普通消息,指定交换机类型) /// /// 消息体 /// 交换机类型 /// public static bool SendMessage(MessageModel message, MQExchangeTypeEnum exchangeType) => SendMessage(null, message, exchangeType); /// /// 发送消息(普通消息) /// /// 消息体 /// public static bool SendMessage(MessageModel message) => SendMessage(null, message, MQExchangeTypeEnum.direct); /// /// 发送消息(指定自定义tcp连接发送消息) /// /// mq连接对象 /// 消息体 /// public static bool SendMessage(MqSettingModel mqSetting, MessageModel message) => SendMessage(mqSetting, message, MQExchangeTypeEnum.direct); /// /// 发送消息(指定自定义tcp连接和交换机类型) /// /// mq连接对象 /// 消息体 /// 交换机类型 /// public static bool SendMessage(MqSettingModel mqSetting, MessageModel message, MQExchangeTypeEnum exchangeType) { var result = false; var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); mqSetting = mqSetting==null?MqSetting: mqSetting; using (var connection = mqSetting.Factory.CreateConnection()) { using (var channel = connection.CreateModel()) { if (!string.IsNullOrWhiteSpace(message.ExchangeName)) { channel.ExchangeDeclare(message.ExchangeName, exchangeType.ToString(), true, false, null); } if (!string.IsNullOrWhiteSpace(message.QueueName)) { channel.QueueDeclare(queue: message.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null); if (!string.IsNullOrWhiteSpace(message.ExchangeName)) { channel.QueueBind(message.QueueName, message.ExchangeName, message.RouteKey, null); } } if (message.SendTime != null && message.SendTime > DateTime.Now.AddSeconds(2)) { TimeSpan ts = Convert.ToDateTime(message.SendTime) - DateTime.Now; var argument = new Dictionary(); argument.Add("x-dead-letter-exchange", message.ExchangeName);//过期消息转向路由 argument.Add("x-dead-letter-routing-key", message.RouteKey);//过期消息转向路由相匹配routingkey argument.Add("x-message-ttl", (long)ts.TotalSeconds * 1000);//消息的过期时间 argument.Add("x-expires", ((long)ts.TotalSeconds * 1000)+1000);//队列的过期时间 var queueName = "timing_"+ message.RouteKey+"_"+ TopUtil.GetId().ToString(); channel.QueueDeclare(queueName, true, false, false, argument); if (!string.IsNullOrWhiteSpace(message.ExchangeName)) { message.RouteKey = queueName; channel.QueueBind(queueName, message.ExchangeName, message.RouteKey, null); } IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true;//消息持久化 channel.BasicPublish(exchange: message.ExchangeName, routingKey: queueName, basicProperties: properties, body: body); } else { IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true;//消息持久化 channel.BasicPublish(exchange: message.ExchangeName, routingKey: message.RouteKey, basicProperties: properties, body: body); } result = true; } } return result; } /// /// 接收并处理消息(指定交换机和交换机类型) /// /// 委托方法 /// 队列名称 /// 路由 /// 交换机类型 /// 交换机名称 /// public void ReceiveMessage(Func runMethod, string queueName,string routeKey,MQExchangeTypeEnum exchangeType, string exchangeName) => ReceiveMessage(null, runMethod, queueName, routeKey, exchangeType, exchangeName); /// /// 接收并处理消息 /// /// 委托方法 /// 队列名称 /// 路由 /// public void ReceiveMessage(Func runMethod, string queueName, string routeKey) => ReceiveMessage(null, runMethod, queueName, routeKey, MQExchangeTypeEnum.direct, ""); /// /// 接收并处理消息(指定自定义tcp连接) /// /// mq连接对象 /// 委托方法 /// 队列名称 /// 路由 /// public void ReceiveMessage(MqSettingModel mqSetting, Func runMethod, string queueName, string routeKey) => ReceiveMessage(mqSetting, runMethod, queueName, routeKey, MQExchangeTypeEnum.direct, ""); /// /// 接收并处理消息(指定自定义tcp连接,并指定交换机和交换机类型) /// /// mq连接对象 /// 委托方法 /// 队列名称 /// 路由 /// 交换机类型 /// 交换机名称 /// public void ReceiveMessage(MqSettingModel mqSetting, Func runMethod, string queueName, string routeKey, MQExchangeTypeEnum exchangeType, string exchangeName) { mqSetting = mqSetting != null ? mqSetting : MqSetting; Connection = mqSetting!=null? mqSetting.Factory.CreateConnection():MqSetting.Factory.CreateConnection(); Channel = Connection.CreateModel(); Channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); if (!string.IsNullOrWhiteSpace(exchangeName)) { Channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), true, false, null); Channel.QueueBind(queueName, exchangeName, routeKey, null); } Channel.BasicQos(0, 1, false);//消费者每次只接收一个消息,并且给Mq回复后才可以接受下一个消息 var consumer = new EventingBasicConsumer(Channel); consumer.Received += (model, ea) => { var body = Encoding.UTF8.GetString(ea.Body); var message = JsonConvert.DeserializeObject(body); var isSuccess = runMethod(message); if (isSuccess) { Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); message.MessageSuccess(queueName, mqSetting.Factory); } else { if (message.RetryCount >= MqSetting.RetryCount - 1) { Channel.BasicReject(ea.DeliveryTag, requeue: false); message.MessageFail(queueName, mqSetting.Factory); } else { //消息重新推送 message.RetryMessage(queueName, mqSetting.Factory); RetryMessage(mqSetting, message); Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } }; Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } /// /// 消息重试机制 /// /// mq连接对象 /// 消息体 /// private static bool RetryMessage(MqSettingModel mqSetting, MessageModel message) { var result = false; var timeDic = new Dictionary(); timeDic.Add(1, 5); timeDic.Add(2, 60); timeDic.Add(3, 60*5); timeDic.Add(4, 60*30); timeDic.Add(5, 60 * 60); timeDic.Add(6, 60 * 60*5); timeDic.Add(7, 60 * 60 * 24); timeDic.Add(8, 60 * 60 * 72); timeDic.Add(9, 60 * 60 * 24*15); timeDic.Add(10, 60 * 60 * 24 * 30); using (var connection = mqSetting.Factory.CreateConnection()) { using (var channel = connection.CreateModel()) { var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); var argument = new Dictionary(); argument.Add("x-dead-letter-exchange", message.ExchangeName);//过期消息转向路由 argument.Add("x-dead-letter-routing-key", message.RouteKey);//过期消息转向路由相匹配routingkey var queueName = "timing_" + message.QueueName; channel.QueueDeclare(queueName, true, false, false, argument); if (!string.IsNullOrWhiteSpace(message.ExchangeName)) { channel.QueueBind(queueName, message.ExchangeName, queueName, null); } IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true;//消息持久化 properties.Expiration = (timeDic[message.RetryCount]*1000L).ToString(); channel.BasicPublish(exchange: message.ExchangeName, routingKey: queueName, basicProperties: properties, body: body); result = true; } } return result; } /// /// 消息转发机制(批量) /// /// 消息体 /// 路由交换机集合 /// public static bool ForwardMessage(MessageModel message, IDictionary routeKeyList) => ForwardMessage(MqSetting, message, routeKeyList); /// /// 消息转发机制(批量,指定mq连接对象) /// /// mq连接对象 /// 消息体 /// 路由交换机集合 /// public static bool ForwardMessage(MqSettingModel mqSetting, MessageModel message,IDictionary routeKeyList) { var result = false; using (var connection = mqSetting.Factory.CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in routeKeyList) { message.ForwardMessage(item.Key, item.Value, mqSetting.Factory); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); if (!string.IsNullOrWhiteSpace(item.Key)) { channel.ExchangeDeclare(item.Key, MQExchangeTypeEnum.direct.ToString(), true, false, null); } IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true;//消息持久化 channel.BasicPublish(exchange: item.Key, routingKey: item.Value, basicProperties: properties, body: body); } result = true; } } return result; } /// /// 消息转发机制(指定mq连接对象) /// /// 消息体 /// 交换机名称 /// 路由 /// public static bool ForwardMessage(MessageModel message, string exchangeName, string routeKey) => ForwardMessage(MqSetting, message, exchangeName, routeKey); /// /// 消息转发机制 /// /// mq连接对象 /// 消息体 /// 交换机名称 /// 路由 /// public static bool ForwardMessage(MqSettingModel mqSetting, MessageModel message,string exchangeName,string routeKey) { var result = false; using (var connection = mqSetting.Factory.CreateConnection()) { using (var channel = connection.CreateModel()) { message.ForwardMessage(exchangeName, routeKey, mqSetting.Factory); var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); if (!string.IsNullOrWhiteSpace(exchangeName)) { channel.ExchangeDeclare(exchangeName, MQExchangeTypeEnum.direct.ToString(), true, false, null); } IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true;//消息持久化 channel.BasicPublish(exchange: exchangeName, routingKey: routeKey, basicProperties: properties, body: body); result = true; } } return result; } } }