using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace CommonUtil.RabbitMQ
{
///
///
///
public class MQClientFullChannel : IDisposable
{
#region 变量
///
/// 接收
///
public IModel channelReceive { get; internal set; }
///
/// 链接
///
public IConnection connectionReceive { get; internal set; }
///
/// 发送对象
///
public string queueName { get; set; }
///
/// 日志文件名
///
public string logName { get; set; }
///
/// >0 表示允许重连,等待多少毫秒
///
public int reConnectTime { get; set; }
#endregion
#region 构造函数
///
/// 空构造函数
///
public MQClientFullChannel()
{
queueName = "";
logName = "MQ日志";
}
///
///
///
/// 需要接收或发送的qname
public MQClientFullChannel(string qname)
{
queueName = qname;
logName = "MQ日志";
}
///
///
///
/// 需要接收或发送的qname
/// 日志文件
public MQClientFullChannel(string qname, string lname)
{
queueName = qname;
logName = lname;
}
///
///
///
public void Dispose()
{
if (connectionReceive != null)
{
connectionReceive.Dispose();
}
if (channelReceive != null)
{
channelReceive.Dispose();
}
}
#endregion
#region 发送消息 静态方法
///
/// 发送消息
///
///
///
///
///
public static string SendMessageStatic(MQClientParam param, string mqname, string message)
{
using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel(mqname))
{
string ret = mQClientFullChannel.SendMessage(param, message);
return ret;
}
}
///
/// 发送消息
///
///
///
///
///
public static string SendMessageStatic(int mode, string mqname, string message)
{
using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel(mqname))
{
string ret = mQClientFullChannel.SendMessage(mode, message);
return ret;
}
}
#endregion
#region 发送消息
///
/// 添加即时消息
///
///
///
///
///
public string SendMessage(MQClientParam param, string mqname, string message)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = param.HostName;
factory.UserName = param.UserName;
factory.Password = param.Password;
factory.Port = param.Port;
factory.VirtualHost = param.VirtualHost;
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(mqname, true, false, false, null);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", mqname, properties, body);
}
}
return "ok";
}
catch (Exception ee)
{
// 写入日志
Task.Run(() =>
{
LogUtil.Error(ee.Message.ToString(), logName);
});
return ee.Message.ToString();
}
}
///
/// 添加即时消息
///
///
///
///
public string SendMessage(MQClientParam param, string message)
{
string mqname = queueName;
if (string.IsNullOrWhiteSpace(mqname))
{
return "传入的参数不正确!";
}
return SendMessage(param, mqname, message);
}
///
/// 根据AppSetting.Json中的内容发送信息
///
/// 0-独立 1-集群
///
///
///
public string SendMessage(int mode, string mqname, string message)
{
MQClientParam param;
string ret = GetParam(mode, out param);
if (ret != "ok") return ret;
return SendMessage(param, mqname, message);
}
///
/// 根据AppSetting.Json中的内容发送信息
///
/// 0-独立 1-集群
///
///
public string SendMessage(int mode, string message)
{
string mqname = queueName;
if (string.IsNullOrWhiteSpace(mqname))
{
return "传入的参数不正确!";
}
return SendMessage(mode, mqname, message);
}
///
/// 发送定时消息
///
///
///
///
///
///
public static string SendDelayMessageStatic(int mode, string mqname, string message, DateTime sendTime)
{
using (MQClientFullChannel mQClientFullChannel = new MQClientFullChannel())
{
string ret = mQClientFullChannel.SendDelayMessage(mode, mqname, message, sendTime);
return ret;
}
}
///
/// 添加延时消息
///
///
///
///
///
///
public string SendDelayMessage(MQClientParam param, string mqname, string message, DateTime sendtime)
{
try
{
if (sendtime <= DateTime.Now)
{
return SendMessage(param, queueName, message);
}
else
{
TimeSpan ts = sendtime - DateTime.Now;
long ttl = long.Parse(ts.TotalMilliseconds.ToString("f0"));
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = param.HostName;
factory.UserName = param.UserName;
factory.Password = param.Password;
factory.Port = param.Port;
factory.VirtualHost = param.VirtualHost;
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//string fqname = Enum.GetName(queueName.GetType(), queueName);
Dictionary dic = new Dictionary();
dic.Add("x-expires", ttl + 300000); //队列中所有消息都有相同的过期时间
dic.Add("x-message-ttl", ttl);//队列上消息过期时间,应小于队列过期时间
dic.Add("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
dic.Add("x-dead-letter-routing-key", mqname);//过期消息转向路由相匹配routingkey
string qname = "duomaitime" + Guid.NewGuid().ToString();
channel.ExchangeDeclare(exchange: "exchange-direct", type: "direct");
channel.QueueDeclare(qname, true, false, false, dic);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", qname, properties, body);
}
}
}
}
catch (Exception ex)
{
return ex.ToString();
}
return "ok";
}
///
/// 添加延时消息
///
///
///
///
///
public string SendDelayMessage(MQClientParam param, string message, DateTime sendtime)
{
string mqname = queueName;
if (string.IsNullOrWhiteSpace(mqname))
{
return "传入的参数不正确!";
}
return SendDelayMessage(param, mqname, message, sendtime);
}
///
/// 添加延时消息
///
/// 0-独立 1-集群
///
///
///
///
public string SendDelayMessage(int mode, string mqname, string message, DateTime sendtime)
{
MQClientParam param;
string ret = GetParam(mode, out param);
if (ret != "ok") return ret;
return SendDelayMessage(param, mqname, message, sendtime);
}
///
/// 添加延时消息
///
///
///
///
///
public string SendDelayMessage(int mode, string message, DateTime sendtime)
{
string mqname = queueName;
if (string.IsNullOrWhiteSpace(mqname))
{
return "传入的参数不正确!";
}
MQClientParam param;
string ret = GetParam(mode, out param);
if (ret != "ok") return ret;
return SendDelayMessage(param, mqname, message, sendtime);
}
#endregion
#region 接收消息
///
/// 消息到达客户端发起的事件
///
/// BasicDeliverEventArgs消息对象
public delegate void ActionEvent(EventMessageResultCluster result);
///
/// 事件激活委托实例
///
private ActionEvent _onMessage;
///
/// 当侦听的队列中有消息到达时触发的执行事件
///
public event ActionEvent OnMessage
{
add
{
if (_onMessage == null)
{
_onMessage += value;
}
}
remove
{
if (_onMessage != null)
{
_onMessage -= value;
}
}
}
///
/// 连接成功
///
private Action _onConnected;
///
/// 链接成功回调
///
public event Action OnConnected
{
add
{
if (_onConnected == null)
{
_onConnected += value;
}
}
remove
{
if (_onConnected != null)
{
_onConnected -= value;
}
}
}
///
/// 链接成功回调
///
///
public void Connected(MQClientParam param)
{
if (_onConnected != null)
{
_onConnected(param);
}
}
///
/// 连接断开
///
private Action _onDisConnected;
///
/// 连接断开回调
///
public event Action OnDisConnected
{
add
{
if (_onDisConnected == null)
{
_onDisConnected += value;
}
}
remove
{
if (_onDisConnected != null)
{
_onDisConnected -= value;
}
}
}
///
/// 连接断开回调
///
///
public void DisConnected(MQClientParam param)
{
if (_onDisConnected != null)
{
_onDisConnected(param);
}
}
///
/// 侦听初始化
///
///
///
public void ConnectInit(MQClientParam param)
{
try
{
string fqname = queueName;
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = param.HostName;
factory.UserName = param.UserName;
factory.Password = param.Password;
factory.Port = param.Port;
factory.VirtualHost = param.VirtualHost;
factory.AutomaticRecoveryEnabled = true;
factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
connectionReceive = factory.CreateConnection();
connectionReceive.ConnectionShutdown += (o, e) =>
{
Task.Run(() =>
{
LogUtil.Info(JSONUtil.ObjectToJson(param) + " --> 链接关闭", logName);
});
// 调用链接关闭的回调
DisConnected(param);
// 关闭
Close();
if (reConnectTime > 0)
{
//等待并重新链接
Task.Delay(reConnectTime).Wait();
ConnectInit(param);
}
};
//获取通道
channelReceive = connectionReceive.CreateModel();
channelReceive.ExchangeDeclare(exchange: "exchange-direct", type: "direct");
channelReceive.QueueBind(fqname, "exchange-direct", fqname);
//创建事件驱动的消费者类型
var consumer = new EventingBasicConsumer(channelReceive);
consumer.Received += consumer_Received;
channelReceive.BasicQos(0, 1, false); //一次只获取一个消息进行消费
channelReceive.BasicConsume(fqname, false, consumer);
// 链接成功,调用一下通知调用者
if (connectionReceive.IsOpen)
{
this.Connected(param);
}
}
catch (Exception ee)
{
Task.Run(() =>
{
LogUtil.Info("ConnectInit(MQClientParam param)异常:" + ee.Message.ToSingular(), logName);
});
// 调用链接关闭的回调
DisConnected(param);
// 关闭
Close();
if (reConnectTime > 0)
{
//等待并重新链接
Task.Delay(reConnectTime).Wait();
ConnectInit(param);
}
}
}
///
///
///
/// 0-表示监听独立的,>0 表示监听集群中的某一个
public string Connect(int mode)
{
// 根据mode获取参数
MQClientParam param;
string ret = "";
if (mode == 0)
{
//独立
ret = GetParam(0, out param);
if (ret != "ok")
{
return ret;
}
}
else
{
List pAll = new List();
ret = GetParam(out pAll);
if (ret != "ok")
{
return ret;
}
if (mode > pAll.Count())
{
return $"Items下没有第[{mode}]条记录";
}
param = pAll[mode - 1];
}
if (param == null)
{
return "MQ参数获取不正确!";
}
try
{
Task.Factory.StartNew(() =>
{
ConnectInit(param);
});
}
catch (Exception ex)
{
Task.Run(() =>
{
LogUtil.Error("Connect(int mode):" + ex.Message.ToString(), "MQClientFullChannel");
});
return ex.Message.ToString();
}
return "ok";
}
///
/// 接受到消息
///
private void consumer_Received(object sender, BasicDeliverEventArgs e)
{
try
{
EventMessageResultCluster emr = new EventMessageResultCluster();
string message = Encoding.UTF8.GetString(e.Body);
//emr.ReplyToAddress = e.BasicProperties==null?"":e.BasicProperties.ReplyToAddress==null?"":e.BasicProperties.ReplyToAddress.ToString();
emr.ConsumerTag = e?.ConsumerTag?.ToString();
emr.RoutingKey = e?.RoutingKey?.ToString();
emr.Topic = "";
emr.Content = message;
if (_onMessage != null)
{
//触发外部侦听事件
_onMessage(emr);
}
if (emr.IsOperationOk == false)
{
//WriteLine("MQ日志", "未能消费此消息,重新放入队列头:" + message);
//未能消费此消息,重新放入队列头
channelReceive.BasicReject(e.DeliveryTag, false);
}
else if (channelReceive.IsClosed == false)
{
//WriteLine("MQ日志", "确认消息:" + message);
channelReceive.BasicAck(e.DeliveryTag, false);
//MqTraceLog.SaveTraceLog(queueName.ToString(), message, 1);
}
}
catch (Exception ex)
{
LogUtil.Error("接收消息出错:" + ex.ToString(), "MQClientFullChannel");
}
}
#endregion
#region 关闭链接
///
/// 关闭连接
///
public void Close()
{
try
{
if (channelReceive != null)
{
channelReceive.Close();
channelReceive.Dispose();
}
}
catch { }
try
{
if (connectionReceive != null)
{
connectionReceive.Close();
connectionReceive.Dispose();
}
}
catch { }
}
#endregion
#region 获取appsetting.json中的MQ参数
///
/// 获取MQ参数
///
/// 0-独立 1-集群
///
///
public string GetParam(int mode, out MQClientParam param)
{
param = null;
// 获取appsetting.json文件中的MQConfig信息
JArray mqItem = null;
if (mode == 0)
{
// 独立
mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig"))));
if (mqItem == null || mqItem.Count() < 4)
{
return "MQ集群参数配置不正确,参数过少!";
}
}
else
{
//指定
var items = ConfigUtil.GetChildren("MQConfig:Items");
if (items.Count() < 1)
{
return "没有配置MQ集群参数";
}
// 随即获取要使用哪个MQConfig
//int rm = new Random().Next(0, items.Count());
int rm = mode;
// 获取参数
mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig:Items:{rm}"))));
if (mqItem == null || mqItem.Count() < 4)
{
return "MQ集群参数配置不正确,参数过少!";
}
}
param = new MQClientParam();
try
{
foreach (var tmp in mqItem)
{
switch (tmp["Key"].ToString().ToLower())
{
case "hostname":
param.HostName = tmp["Value"].ToString();
break;
case "port":
param.Port = Convert.ToInt32(tmp["Value"]);
break;
case "username":
param.UserName = tmp["Value"].ToString();
break;
case "password":
param.Password = tmp["Value"].ToString();
break;
case "retrycount":
param.RetryCount = Convert.ToInt32(tmp["Value"]);
break;
case "virtualhost":
param.VirtualHost = tmp["Value"].ToString();
break;
}
}
}
catch (Exception ee)
{
return "MQ集群参数设置不正确!" + ee.Message.ToString();
}
if (param == null)
{
return "MQ参数获取失败!";
}
return "ok";
}
///
/// 获取所有集群参数
///
///
///
public string GetParam(out List paramAll)
{
paramAll = null;
//集群
var items = ConfigUtil.GetChildren("MQConfig:Items");
if (items.Count() < 1)
{
return "没有配置MQ集群参数";
}
for (int i = 0; i < items.Count(); i++)
{
JArray mqItem = (JArray)JSONUtil.JsonToObject(JSONUtil.ObjectToJson((ConfigUtil.GetChildren($"MQConfig:Items:{i}"))));
if (mqItem == null || mqItem.Count() < 4)
{
continue;
}
// 循环获取
MQClientParam param = new MQClientParam();
try
{
foreach (var tmp in mqItem)
{
switch (tmp["Key"].ToString().ToLower())
{
case "hostname":
param.HostName = tmp["Value"].ToString();
break;
case "port":
param.Port = Convert.ToInt32(tmp["Value"]);
break;
case "username":
param.UserName = tmp["Value"].ToString();
break;
case "password":
param.Password = tmp["Value"].ToString();
break;
case "retrycount":
param.RetryCount = Convert.ToInt32(tmp["Value"]);
break;
case "virtualhost":
param.VirtualHost = tmp["Value"].ToString();
break;
}
}
}
catch (Exception ee)
{
LogUtil.Error("MQ集群参数设置不正确:" + ee.Message.ToString(), "MQClientFullChannel");
continue;
}
paramAll = paramAll == null ? new List() : paramAll;
paramAll.Add(param);
}
return "ok";
}
#endregion
///
/// 具体发送的参数
///
public class MQClientParam
{
///
/// MQ服务器地址
///
public string HostName { get; set; }
///
/// MQ服务器端口
///
public int Port { get; set; }
///
/// MQ登录名
///
public string UserName { get; set; }
///
/// MQ密码
///
public string Password { get; set; }
///
///
///
public string VirtualHost { get; set; }
///
///
///
public int RetryCount { get; set; }
}
///
///
///
public class RabbitMQSetCluster
{
private string hostname = "106.14.15.190";
///
/// 地址
///
public string HostName
{
get { return this.hostname; }
}
private List listhostname = new List(new string[] { "106.14.15.190", "106.14.223.74", "101.132.39.50" });
///
///
///
public List HostNameList
{
get { return this.listhostname; }
}
private int port = 8102;
///
/// 端口
///
public int Port
{
get { return this.port; }
}
private string username = "duomaicrm";
///
/// 用户名
///
public string UserName
{
get { return this.username; }
}
private string password = "taocrm123";
///
/// 密码
///
public string Password
{
get { return this.password; }
}
}
///
///
///
public class EventMessageResultCluster
{
///
///
///
public string Topic { get; set; }
///
///
///
public string Content { get; set; }
///
/// 消息处理是否成功
///
public bool IsOperationOk { get; set; }
///
///
///
public void Confirm()
{
this.IsOperationOk = true;
}
///
///
///
public void Fail()
{
this.IsOperationOk = false;
}
///
///
///
public string ConsumerTag { get; set; }
///
///
///
///
public string RoutingKey { get; set; }
}
}
}