using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Text;
namespace CommonUtil.RabbitMQ
{
///
/// mq配置模型
///
public class MqSettingModel
{
///
/// MQTCP连接
///
public ConnectionFactory Factory { get; set; }
///
/// MQTCP连接IP
///
public string HostName { get; set; }
///
/// mq用户名称
///
public string UserName { get; set; }
///
/// mq用户密码
///
public string Password { get; set; }
///
/// mq端口号
///
public int Port { get; set; }
///
/// mq虚拟主机
///
public string VirtualHost { get; set; }
///
/// mq消息重试次数
///
public int RetryCount { get; set; }
///
/// MQ配置集合
///
public List ChildItems { get; set; }
///
/// 构造方法
///
public MqSettingModel(string mqConfig = "MQConfig")
{
this.HostName = ConfigUtil.Configuration[$"{mqConfig}:HostName"].Trim();
this.Password = ConfigUtil.Configuration[$"{mqConfig}:Password"].Trim();
this.Port = Int32.Parse(ConfigUtil.Configuration[$"{mqConfig}:Port"].Trim());
this.UserName = ConfigUtil.Configuration[$"{mqConfig}:UserName"].Trim();
this.VirtualHost = ConfigUtil.Configuration[$"{mqConfig}:VirtualHost"] == null ? null : ConfigUtil.Configuration[$"{mqConfig}:VirtualHost"].Trim();
this.RetryCount = int.Parse(ConfigUtil.Configuration[$"{mqConfig}:RetryCount"].Trim());
if (VirtualHost.IsNull())
{//淘内MQ 没有虚拟主机
this.Factory = new ConnectionFactory()
{
UserName = this.UserName,
Password = this.Password,
Port = this.Port,
HostName = this.HostName
};
}
else
{
this.Factory = new ConnectionFactory()
{
UserName = this.UserName,
Password = this.Password,
Port = this.Port,
VirtualHost = this.VirtualHost,
HostName = this.HostName
};
}
this.ChildItems = new List();
foreach (var item in ConfigUtil.Configuration.GetSection($"{mqConfig}:Items").GetChildren())
{
this.ChildItems.Add(new MqSettingModel(item.Path));
}
}
}
///
/// MQ消息体
///
public class MessageModel
{
///
/// 消息Id
///
public string MessageId { get; set; }
///
/// 消息事件
///
public string MessageEvent { get; set; }
///
/// 发送时间
///
public DateTime? SendTime { get; set; }
///
/// 来源消息Id
///
public string FromMessageId { get; set; }
///
/// 消息体
///
public string Body { get; set; }
///
/// 消息类型(timing:定时消息,common:普通消息)
///
public string MessageType { get; set; }
///
/// 队列名称
///
public string QueueName { get; set; }
///
/// 队列路由
///
public string RouteKey { get; set; }
///
/// 交换机名称
///
public string ExchangeName { get; set; }
///
/// 商家Id
///
public string UserId { get; set; }
///
/// 重试次数
///
public int RetryCount { get; set; }
///
/// 状态
///
public string State { get; set; }
///
/// 初始化消息
///
///
///
///
///
///
///
///
///
///
public MessageModel(string userId, string eventkey, string messageBody, MessageTypeEnum messageType, string routeKey, DateTime? sendTime = null, string exchangeName = "", string queueName = "", string messageId = "")
{
messageId = messageId.IsNullOrEmpty() ? userId + "_" + GetMD5(messageBody) : userId + "_" + messageId;
this.Body = messageBody;
this.ExchangeName = exchangeName;
this.MessageEvent = eventkey;
this.MessageId = messageId;
this.RouteKey = routeKey;
this.MessageType = messageType.ToString();
this.QueueName = queueName;
this.RetryCount = 0;
this.UserId = userId;
this.State = MessageStateEnum.send.ToString();
this.SendTime = sendTime;
#region 写入日志
LogUtil.Info("初始化创建消息:" + JsonConvert.SerializeObject(this), "MQTrace");
#endregion
}
///
/// md5加密
///
///
///
public string GetMD5(string myString)
{
MD5 md5 = new MD5CryptoServiceProvider();
byte[] fromData = System.Text.Encoding.Unicode.GetBytes(myString);
byte[] targetData = md5.ComputeHash(fromData);
string byte2String = null;
for (int i = 0; i < targetData.Length; i++)
{
byte2String += targetData[i].ToString("x");
}
return byte2String;
}
///
/// 消息重新推送变更消息体
///
///
///
///
public void RetryMessage(string queueName, ConnectionFactory factory)
{
var host = factory.HostName;
var userName = factory.UserName;
var port = factory.Port;
var virtualHost = factory.VirtualHost;
this.RetryCount += 1;
this.QueueName = queueName;
this.State = MessageStateEnum.retry.ToString();
#region 写入日志
LogUtil.Info("消息重试:" + JsonConvert.SerializeObject(this), "MQTrace");
#endregion
}
///
/// 消息转发
///
///
///
///
///
public void ForwardMessage(string exchangeName, string routeKey, ConnectionFactory factory)
{
var host = factory.HostName;
var userName = factory.UserName;
var port = factory.Port;
var virtualHost = factory.VirtualHost;
this.FromMessageId = this.MessageId;
this.MessageId = this.UserId + "_" + Guid.NewGuid().ToString("N");
this.ExchangeName = exchangeName;
this.RouteKey = routeKey;
this.RetryCount = 0;
this.State = MessageStateEnum.forward.ToString();
this.QueueName = null;
#region 写入日志
LogUtil.Info("消息转发:" + JsonConvert.SerializeObject(this), "MQTrace");
#endregion
}
///
/// 消息处理成功
///
///
///
///
public void MessageSuccess(string queueName, ConnectionFactory factory)
{
var host = factory.HostName;
var userName = factory.UserName;
var port = factory.Port;
var virtualHost = factory.VirtualHost;
this.QueueName = queueName;
this.State = MessageStateEnum.success.ToString();
#region 写入日志
LogUtil.Info("消息消费成功:" + JsonConvert.SerializeObject(this), "MQTrace");
#endregion
}
///
/// 消息处理失败
///
///
///
///
public void MessageFail(string queueName, ConnectionFactory factory)
{
var host = factory.HostName;
var userName = factory.UserName;
var port = factory.Port;
var virtualHost = factory.VirtualHost;
this.QueueName = queueName;
this.State = MessageStateEnum.fail.ToString();
#region 写入日志
LogUtil.Info("消息消费失败:" + JsonConvert.SerializeObject(this), "MQTrace");
#endregion
}
}
}