using Newtonsoft.Json.Linq; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; namespace CommonUtil.RabbitMQ { /// /// /// public class MQClientFullChannel : IDisposable { #region 变量 /// /// 接收 /// public IModel channelReceive { get; internal set; } /// /// 链接 /// public IConnection connectionReceive { get; internal set; } /// /// 发送对象 /// public string queueName { get; set; } /// /// 日志文件名 /// public string logName { get; set; } /// /// >0 表示允许重连,等待多少毫秒 /// public int reConnectTime { get; set; } #endregion #region 构造函数 /// /// 空构造函数 /// public MQClientFullChannel() { queueName = ""; logName = "MQ日志"; } /// /// /// /// 需要接收或发送的qname public MQClientFullChannel(string qname) { queueName = qname; logName = "MQ日志"; } /// /// /// /// 需要接收或发送的qname /// 日志文件 public MQClientFullChannel(string qname, string lname) { queueName = qname; logName = lname; } /// /// /// public void Dispose() { if (connectionReceive != null) { connectionReceive.Dispose(); } if (channelReceive != null) { channelReceive.Dispose(); } } #endregion #region 发送消息 静态方法 /// /// 发送消息 /// /// /// /// /// public static string SendMessageStatic(MQClientParam param, string mqname, string message) { using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel(mqname)) { string ret = mQClientFullChannel.SendMessage(param, message); return ret; } } /// /// 发送消息 /// /// /// /// /// public static string SendMessageStatic(int mode, string mqname, string message) { using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel(mqname)) { string ret = mQClientFullChannel.SendMessage(mode, message); return ret; } } #endregion #region 发送消息 /// /// 添加即时消息 /// /// /// /// /// public string SendMessage(MQClientParam param, string mqname, string message) { try { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = param.HostName; factory.UserName = param.UserName; factory.Password = param.Password; factory.Port = param.Port; factory.VirtualHost = param.VirtualHost; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(mqname, true, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; properties.Persistent = true; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", mqname, properties, body); } } return "ok"; } catch (Exception ee) { // 写入日志 Task.Run(() => { LogUtil.Error(ee.Message.ToString(), logName); }); return ee.Message.ToString(); } } /// /// 添加即时消息 /// /// /// /// public string SendMessage(MQClientParam param, string message) { string mqname = queueName; if (string.IsNullOrWhiteSpace(mqname)) { return "传入的参数不正确!"; } return SendMessage(param, mqname, message); } /// /// 根据AppSetting.Json中的内容发送信息 /// /// 0-独立 1-集群 /// /// /// public string SendMessage(int mode, string mqname, string message) { MQClientParam param; string ret = GetParam(mode, out param); if (ret != "ok") return ret; return SendMessage(param, mqname, message); } /// /// 根据AppSetting.Json中的内容发送信息 /// /// 0-独立 1-集群 /// /// public string SendMessage(int mode, string message) { string mqname = queueName; if (string.IsNullOrWhiteSpace(mqname)) { return "传入的参数不正确!"; } return SendMessage(mode, mqname, message); } /// /// 发送定时消息 /// /// /// /// /// /// public static string SendDelayMessageStatic(int mode, string mqname, string message, DateTime sendTime) { using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel()) { string ret = mQClientFullChannel.SendDelayMessage(mode, mqname, message, sendTime); return ret; } } /// /// 添加延时消息 /// /// /// /// /// /// public string SendDelayMessage(MQClientParam param, string mqname, string message, DateTime sendtime) { try { if (sendtime <= DateTime.Now) { return SendMessage(param, queueName, message); } else { TimeSpan ts = sendtime - DateTime.Now; long ttl = long.Parse(ts.TotalMilliseconds.ToString("f0")); ConnectionFactory factory = new ConnectionFactory(); factory.HostName = param.HostName; factory.UserName = param.UserName; factory.Password = param.Password; factory.Port = param.Port; factory.VirtualHost = param.VirtualHost; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //string fqname = Enum.GetName(queueName.GetType(), queueName); Dictionary dic = new Dictionary(); dic.Add("x-expires", ttl + 300000); //队列中所有消息都有相同的过期时间 dic.Add("x-message-ttl", ttl);//队列上消息过期时间,应小于队列过期时间 dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由 dic.Add("x-dead-letter-routing-key", mqname);//过期消息转向路由相匹配routingkey string qname = "duomaitime" + Guid.NewGuid().ToString(); channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct"); channel.QueueDeclare(qname, true, false, false, dic); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; properties.Persistent = true; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", qname, properties, body); } } } } catch (Exception ex) { return ex.ToString(); } return "ok"; } /// /// 添加延时消息 /// /// /// /// /// public string SendDelayMessage(MQClientParam param, string message, DateTime sendtime) { string mqname = queueName; if (string.IsNullOrWhiteSpace(mqname)) { return "传入的参数不正确!"; } return SendDelayMessage(param, mqname, message, sendtime); } /// /// 添加延时消息 /// /// 0-独立 1-集群 /// /// /// /// public string SendDelayMessage(int mode, string mqname, string message, DateTime sendtime) { MQClientParam param; string ret = GetParam(mode, out param); if (ret != "ok") return ret; return SendDelayMessage(param, mqname, message, sendtime); } /// /// 添加延时消息 /// /// /// /// /// public string SendDelayMessage(int mode, string message, DateTime sendtime) { string mqname = queueName; if (string.IsNullOrWhiteSpace(mqname)) { return "传入的参数不正确!"; } MQClientParam param; string ret = GetParam(mode, out param); if (ret != "ok") return ret; return SendDelayMessage(param, mqname, message, sendtime); } #endregion #region 接收消息 /// /// 消息到达客户端发起的事件 /// /// BasicDeliverEventArgs消息对象 public delegate void ActionEvent(EventMessageResultCluster result); /// /// 事件激活委托实例 /// private ActionEvent _onMessage; /// /// 当侦听的队列中有消息到达时触发的执行事件 /// public event ActionEvent OnMessage { add { if (_onMessage == null) { _onMessage += value; } } remove { if (_onMessage != null) { _onMessage -= value; } } } /// /// 连接成功 /// private Action _onConnected; /// /// 链接成功回调 /// public event Action OnConnected { add { if (_onConnected == null) { _onConnected += value; } } remove { if (_onConnected != null) { _onConnected -= value; } } } /// /// 链接成功回调 /// /// public void Connected(MQClientParam param) { if (_onConnected != null) { _onConnected(param); } } /// /// 连接断开 /// private Action _onDisConnected; /// /// 连接断开回调 /// public event Action OnDisConnected { add { if (_onDisConnected == null) { _onDisConnected += value; } } remove { if (_onDisConnected != null) { _onDisConnected -= value; } } } /// /// 连接断开回调 /// /// public void DisConnected(MQClientParam param) { if (_onDisConnected != null) { _onDisConnected(param); } } /// /// 侦听初始化 /// /// /// public void ConnectInit(MQClientParam param) { try { string fqname = queueName; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = param.HostName; factory.UserName = param.UserName; factory.Password = param.Password; factory.Port = param.Port; factory.VirtualHost = param.VirtualHost; factory.AutomaticRecoveryEnabled = true; factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10); connectionReceive = factory.CreateConnection(); connectionReceive.ConnectionShutdown += (o, e) => { Task.Run(() => { LogUtil.Info(JSONUtil.ObjectToJson(param) + " --> 链接关闭", logName); }); // 调用链接关闭的回调 DisConnected(param); // 关闭 Close(); if (reConnectTime > 0) { //等待并重新链接 Task.Delay(reConnectTime).Wait(); ConnectInit(param); } }; //获取通道 channelReceive = connectionReceive.CreateModel(); channelReceive.ExchangeDeclare(exchange: "exchange-direct", type: "direct"); channelReceive.QueueBind(fqname, "exchange-direct", fqname); //创建事件驱动的消费者类型 var consumer = new EventingBasicConsumer(channelReceive); consumer.Received += consumer_Received; channelReceive.BasicQos(0, 1, false); //一次只获取一个消息进行消费 channelReceive.BasicConsume(fqname, false, consumer); // 链接成功,调用一下通知调用者 if (connectionReceive.IsOpen) { this.Connected(param); } } catch (Exception ee) { Task.Run(() => { LogUtil.Info("ConnectInit(MQClientParam param)异常:" + ee.Message.ToSingular(), logName); }); // 调用链接关闭的回调 DisConnected(param); // 关闭 Close(); if (reConnectTime > 0) { //等待并重新链接 Task.Delay(reConnectTime).Wait(); ConnectInit(param); } } } /// /// /// /// 0-表示监听独立的,>0 表示监听集群中的某一个 public string Connect(int mode) { // 根据mode获取参数 MQClientParam param; string ret = ""; if (mode == 0) { //独立 ret = GetParam(0, out param); if (ret != "ok") { return ret; } } else { List pAll = new List(); ret = GetParam(out pAll); if (ret != "ok") { return ret; } if (mode > pAll.Count()) { return $"Items下没有第[{mode}]条记录"; } param = pAll[mode - 1]; } if (param == null) { return "MQ参数获取不正确!"; } try { Task.Factory.StartNew(() => { ConnectInit(param); }); } catch (Exception ex) { Task.Run(() => { LogUtil.Error("Connect(int mode):" + ex.Message.ToString(), "MQClientFullChannel"); }); return ex.Message.ToString(); } return "ok"; } /// /// 接受到消息 /// private void consumer_Received(object sender, BasicDeliverEventArgs e) { try { EventMessageResultCluster emr = new EventMessageResultCluster(); string message = Encoding.UTF8.GetString(e.Body); //emr.ReplyToAddress = e.BasicProperties==null?"":e.BasicProperties.ReplyToAddress==null?"":e.BasicProperties.ReplyToAddress.ToString(); emr.ConsumerTag = e?.ConsumerTag?.ToString(); emr.RoutingKey = e?.RoutingKey?.ToString(); emr.Topic = ""; emr.Content = message; if (_onMessage != null) { //触发外部侦听事件 _onMessage(emr); } if (emr.IsOperationOk == false) { //WriteLine("MQ日志", "未能消费此消息,重新放入队列头:" + message); //未能消费此消息,重新放入队列头 channelReceive.BasicReject(e.DeliveryTag, false); } else if (channelReceive.IsClosed == false) { //WriteLine("MQ日志", "确认消息:" + message); channelReceive.BasicAck(e.DeliveryTag, false); //MqTraceLog.SaveTraceLog(queueName.ToString(), message, 1); } } catch (Exception ex) { LogUtil.Error("接收消息出错:" + ex.ToString(), "MQClientFullChannel"); } } #endregion #region 关闭链接 /// /// 关闭连接 /// public void Close() { try { if (channelReceive != null) { channelReceive.Close(); channelReceive.Dispose(); } } catch { } try { if (connectionReceive != null) { connectionReceive.Close(); connectionReceive.Dispose(); } } catch { } } #endregion #region 获取appsetting.json中的MQ参数 /// /// 获取MQ参数 /// /// 0-独立 1-集群 /// /// public string GetParam(int mode, out MQClientParam param) { param = null; // 获取appsetting.json文件中的MQConfig信息 JArray mqItem = null; if (mode == 0) { // 独立 mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig")))); if (mqItem == null || mqItem.Count() < 4) { return "MQ集群参数配置不正确,参数过少!"; } } else { //指定 var items = ConfigUtil.GetChildren("MQConfig:Items"); if (items.Count() < 1) { return "没有配置MQ集群参数"; } // 随即获取要使用哪个MQConfig //int rm = new Random().Next(0, items.Count()); int rm = mode; // 获取参数 mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig:Items:{rm}")))); if (mqItem == null || mqItem.Count() < 4) { return "MQ集群参数配置不正确,参数过少!"; } } param = new MQClientParam(); try { foreach (var tmp in mqItem) { switch (tmp["Key"].ToString().ToLower()) { case "hostname": param.HostName = tmp["Value"].ToString(); break; case "port": param.Port = Convert.ToInt32(tmp["Value"]); break; case "username": param.UserName = tmp["Value"].ToString(); break; case "password": param.Password = tmp["Value"].ToString(); break; case "retrycount": param.RetryCount = Convert.ToInt32(tmp["Value"]); break; case "virtualhost": param.VirtualHost = tmp["Value"].ToString(); break; } } } catch (Exception ee) { return "MQ集群参数设置不正确!" + ee.Message.ToString(); } if (param == null) { return "MQ参数获取失败!"; } return "ok"; } /// /// 获取所有集群参数 /// /// /// public string GetParam(out List paramAll) { paramAll = null; //集群 var items = ConfigUtil.GetChildren("MQConfig:Items"); if (items.Count() < 1) { return "没有配置MQ集群参数"; } for (int i = 0; i < items.Count(); i++) { JArray mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig:Items:{i}")))); if (mqItem == null || mqItem.Count() < 4) { continue; } // 循环获取 MQClientParam param = new MQClientParam(); try { foreach (var tmp in mqItem) { switch (tmp["Key"].ToString().ToLower()) { case "hostname": param.HostName = tmp["Value"].ToString(); break; case "port": param.Port = Convert.ToInt32(tmp["Value"]); break; case "username": param.UserName = tmp["Value"].ToString(); break; case "password": param.Password = tmp["Value"].ToString(); break; case "retrycount": param.RetryCount = Convert.ToInt32(tmp["Value"]); break; case "virtualhost": param.VirtualHost = tmp["Value"].ToString(); break; } } } catch (Exception ee) { LogUtil.Error("MQ集群参数设置不正确:" + ee.Message.ToString(), "MQClientFullChannel"); continue; } paramAll = paramAll == null ? new List() : paramAll; paramAll.Add(param); } return "ok"; } #endregion /// /// 具体发送的参数 /// public class MQClientParam { /// /// MQ服务器地址 /// public string HostName { get; set; } /// /// MQ服务器端口 /// public int Port { get; set; } /// /// MQ登录名 /// public string UserName { get; set; } /// /// MQ密码 /// public string Password { get; set; } /// /// /// public string VirtualHost { get; set; } /// /// /// public int RetryCount { get; set; } } /// /// /// public class RabbitMQSetCluster { private string hostname = "106.14.15.190"; /// /// 地址 /// public string HostName { get { return this.hostname; } } private List listhostname = new List(new string[] { "106.14.15.190", "106.14.223.74", "101.132.39.50" }); /// /// /// public List HostNameList { get { return this.listhostname; } } private int port = 8102; /// /// 端口 /// public int Port { get { return this.port; } } private string username = "duomaicrm"; /// /// 用户名 /// public string UserName { get { return this.username; } } private string password = "taocrm123"; /// /// 密码 /// public string Password { get { return this.password; } } } /// /// /// public class EventMessageResultCluster { /// /// /// public string Topic { get; set; } /// /// /// public string Content { get; set; } /// /// 消息处理是否成功 /// public bool IsOperationOk { get; set; } /// /// /// public void Confirm() { this.IsOperationOk = true; } /// /// /// public void Fail() { this.IsOperationOk = false; } /// /// /// public string ConsumerTag { get; set; } /// /// /// /// public string RoutingKey { get; set; } } } }