using Newtonsoft.Json; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Security.Cryptography; using System.Text; namespace CommonUtil.RabbitMQ { /// /// mq配置模型 /// public class MqSettingModel { /// /// MQTCP连接 /// public ConnectionFactory Factory { get; set; } /// /// MQTCP连接IP /// public string HostName { get; set; } /// /// mq用户名称 /// public string UserName { get; set; } /// /// mq用户密码 /// public string Password { get; set; } /// /// mq端口号 /// public int Port { get; set; } /// /// mq虚拟主机 /// public string VirtualHost { get; set; } /// /// mq消息重试次数 /// public int RetryCount { get; set; } /// /// MQ配置集合 /// public List ChildItems { get; set; } /// /// 构造方法 /// public MqSettingModel(string mqConfig = "MQConfig") { this.HostName = ConfigUtil.Configuration[$"{mqConfig}:HostName"].Trim(); this.Password = ConfigUtil.Configuration[$"{mqConfig}:Password"].Trim(); this.Port = Int32.Parse(ConfigUtil.Configuration[$"{mqConfig}:Port"].Trim()); this.UserName = ConfigUtil.Configuration[$"{mqConfig}:UserName"].Trim(); this.VirtualHost = ConfigUtil.Configuration[$"{mqConfig}:VirtualHost"] == null ? null : ConfigUtil.Configuration[$"{mqConfig}:VirtualHost"].Trim(); this.RetryCount = int.Parse(ConfigUtil.Configuration[$"{mqConfig}:RetryCount"].Trim()); if (VirtualHost.IsNull()) {//淘内MQ 没有虚拟主机 this.Factory = new ConnectionFactory() { UserName = this.UserName, Password = this.Password, Port = this.Port, HostName = this.HostName }; } else { this.Factory = new ConnectionFactory() { UserName = this.UserName, Password = this.Password, Port = this.Port, VirtualHost = this.VirtualHost, HostName = this.HostName }; } this.ChildItems = new List(); foreach (var item in ConfigUtil.Configuration.GetSection($"{mqConfig}:Items").GetChildren()) { this.ChildItems.Add(new MqSettingModel(item.Path)); } } } /// /// MQ消息体 /// public class MessageModel { /// /// 消息Id /// public string MessageId { get; set; } /// /// 消息事件 /// public string MessageEvent { get; set; } /// /// 发送时间 /// public DateTime? SendTime { get; set; } /// /// 来源消息Id /// public string FromMessageId { get; set; } /// /// 消息体 /// public string Body { get; set; } /// /// 消息类型(timing:定时消息,common:普通消息) /// public string MessageType { get; set; } /// /// 队列名称 /// public string QueueName { get; set; } /// /// 队列路由 /// public string RouteKey { get; set; } /// /// 交换机名称 /// public string ExchangeName { get; set; } /// /// 商家Id /// public string UserId { get; set; } /// /// 重试次数 /// public int RetryCount { get; set; } /// /// 状态 /// public string State { get; set; } /// /// 初始化消息 /// /// /// /// /// /// /// /// /// /// public MessageModel(string userId, string eventkey, string messageBody, MessageTypeEnum messageType, string routeKey, DateTime? sendTime = null, string exchangeName = "", string queueName = "", string messageId = "") { messageId = messageId.IsNullOrEmpty() ? userId + "_" + GetMD5(messageBody) : userId + "_" + messageId; this.Body = messageBody; this.ExchangeName = exchangeName; this.MessageEvent = eventkey; this.MessageId = messageId; this.RouteKey = routeKey; this.MessageType = messageType.ToString(); this.QueueName = queueName; this.RetryCount = 0; this.UserId = userId; this.State = MessageStateEnum.send.ToString(); this.SendTime = sendTime; #region 写入日志 LogUtil.Info("初始化创建消息:" + JsonConvert.SerializeObject(this), "MQTrace"); #endregion } /// /// md5加密 /// /// /// public string GetMD5(string myString) { MD5 md5 = new MD5CryptoServiceProvider(); byte[] fromData = System.Text.Encoding.Unicode.GetBytes(myString); byte[] targetData = md5.ComputeHash(fromData); string byte2String = null; for (int i = 0; i < targetData.Length; i++) { byte2String += targetData[i].ToString("x"); } return byte2String; } /// /// 消息重新推送变更消息体 /// /// /// /// public void RetryMessage(string queueName, ConnectionFactory factory) { var host = factory.HostName; var userName = factory.UserName; var port = factory.Port; var virtualHost = factory.VirtualHost; this.RetryCount += 1; this.QueueName = queueName; this.State = MessageStateEnum.retry.ToString(); #region 写入日志 LogUtil.Info("消息重试:" + JsonConvert.SerializeObject(this), "MQTrace"); #endregion } /// /// 消息转发 /// /// /// /// /// public void ForwardMessage(string exchangeName, string routeKey, ConnectionFactory factory) { var host = factory.HostName; var userName = factory.UserName; var port = factory.Port; var virtualHost = factory.VirtualHost; this.FromMessageId = this.MessageId; this.MessageId = this.UserId + "_" + Guid.NewGuid().ToString("N"); this.ExchangeName = exchangeName; this.RouteKey = routeKey; this.RetryCount = 0; this.State = MessageStateEnum.forward.ToString(); this.QueueName = null; #region 写入日志 LogUtil.Info("消息转发:" + JsonConvert.SerializeObject(this), "MQTrace"); #endregion } /// /// 消息处理成功 /// /// /// /// public void MessageSuccess(string queueName, ConnectionFactory factory) { var host = factory.HostName; var userName = factory.UserName; var port = factory.Port; var virtualHost = factory.VirtualHost; this.QueueName = queueName; this.State = MessageStateEnum.success.ToString(); #region 写入日志 LogUtil.Info("消息消费成功:" + JsonConvert.SerializeObject(this), "MQTrace"); #endregion } /// /// 消息处理失败 /// /// /// /// public void MessageFail(string queueName, ConnectionFactory factory) { var host = factory.HostName; var userName = factory.UserName; var port = factory.Port; var virtualHost = factory.VirtualHost; this.QueueName = queueName; this.State = MessageStateEnum.fail.ToString(); #region 写入日志 LogUtil.Info("消息消费失败:" + JsonConvert.SerializeObject(this), "MQTrace"); #endregion } } }