MQ流程简介


信息发送端将消息(message)发送到exchange
exchange接受消息之后,负责将其路由到具体的队列中
Bindings负责连接exchange和队列(queue)
消息到达队列(queue),然后等待被消息接收端处理
消息接收端处理消息
程序结构

注意事项:
跨数据中心调用:无法直接联通,需要转调程序转发消息
同数据中心跨SET调用:目前是直接连接的
具体使用的MQ服务所在环境已连接串配置所属环境为准,如:TTP.MQ.RabbitMQ.BSS.ConnectionString.HA,B4
跨SET发布需保证相关元数据(在各个相关SET都要有定义),Event元数据生产环境会自动拉取,Task元数据生产环境不自动拉取,需运维添加相关元数据补丁
TaskSender使用TaskBuilder来创建Task对象
EventBus推荐使用PublishHighFrequence方法
TaskSender及EventBus都是通过TSD服务进行消息发送
Task无接收方会产生消息积压,Event无接收方不会产生消息积压
Task中起异步线程处理后返回,原则上应禁止
事件和消息的区别
1、分布式事件(Event)
发送方与消费方是一对多的关系,发送一次,可以被多个消费方消费。
使用场景:已知有多个消费方会使用或者发送方不想为消费方定制时使用,事件要求通用,所以发送的消息内的属性一般较全。
DDP中的定义如下:

标识:特来电系统中对于此事件的唯一标识
主题:要发送到MQ的具体Exchange名称(后面会通过这个属性在RabbitMQ的管理程序中进行查找)
队列连接配置项:RabbitMQ的发送地址的连接配置项,此配置决定了我们将事件发送到哪一个MQ集群。连接串设置为发送方所在Set的连接配置串
一般代码发送使用高性能模式:

点对点消息
发送方与消费方是一对一的关系,可以认为此发送是专用定制化的。
使用场景:发送方与消费方协商一致,定制消息内容,无法给其他消费者服务。
DDP中的定义如下(目前侧重消费方):

标识:既是特来电系统中对于此消息的唯一标识,又是消费RabbitMQ中的Queue的名称

队列连接配置项设置为消费方所在Set的连接配置串。
发送方代码如下:

编程相关
日常开发应该针对于程序中的长链路进行异步拆分,提高程序的健壮性、可维护性等
关于Command和Event
在长链路请求中应该区分Command和Event
Command的特点是发起一个操作,针对Command的处理结果来决定后续执行流程
Event的特点是完成了一个操作,完成后通知相关的业务进行后续处理,并不关心后续流程的结果
Command:

Event:

关于消息丢失问题
大部分的MQ组件都是会丢消息的,有一些消息组件在调参之后可以达到消息不丢失,但并非绝对
开发要求:
识别应用场景消息丢失的可容忍程度
针对于高敏感的消息应该具备完善的消息补偿机制,对于容忍度低的场景可以考虑发送消息后有消息检查机制或定期重发机制(其实像NC完全可以增加一个消息手动重发的功能)
服务状态自维护
MQ组件消息不丢失也有可能因为超时或程序重启等造成消息的丢失
要达到消息不丢失只能依靠预处理+回查的方式来保证
理论上所有的接口都要具有幂等性
长耗时的异步任务
长耗时的异步任务理论上对于系统性能消耗较大,应增加其可控及可监控性
设计原则:
设计时应考虑边界和批次问题
设计时应有最小资源消耗的思考方案
任务应具备可取消、可继续执行的能力
任务应考虑单线程执行,批量一次性发起可能导致MAC、HSF高线程数问题
设计时应避免开启异步任务后直接返回
如有需要,最好可以添加个长链路执行任务组件,提供统一的执行控制和监控
如有需要,尽可能通过redis锁控制执行线程数,防止并发执行
如果需要尽可能的手动控制线程总量,由进入后开启Task执行修改为工作项+处理线程的方式
如无必要尽量不要这么做,这样会跳过我们的监控体系,无法预估和监控程序的执行
长耗时链路应尽量在业务低峰期执行
长耗时任务应尽量使用读库
注意消息的循环发送
延时队列
目前体系中并不支持延时队列,可通过自定义重试+逻辑判断来使用
正常的消息体格式:
{
"status": 1,
"TaskType": 0,
"BusinessType": "ASCore-NewEWalletChargingOrderEndProFrozenEventProcessor",
"BillID": null,
"IsPersistence": true,
"ID": "0d5fe29e-de54-4286-be9f-0213013ef64f",
"Name": "HaASCore-NewEWalletChargingOrderEndProFrozenEventProcessorQueue",
"Creator": null,
"Destination": "HaASCore-NewEWalletChargingOrderEndProFrozenEventProcessorQueue",
"CreateTime": "2022-07-05T15:43:25.1024763+08:00",
"ModifyTime": "2022-07-05T15:43:25.103473+08:00",
"Data": {
"FailedRetry": "False",
"Message": "",
"SendTime": "2022-07-05T15:43:25.0962259+08:00",
"C_Task_BillID": null,
"RequestID": "0332b304-9109-4788-9a63-303cf0d88db7",
"C_MaxRetryCount": 4,
"C_CurrentRetryCount": 0,
"C_TaskID": "0d5fe29e-de54-4286-be9f-0213013ef64f",
"C_Task_StartTaskTime": "2022-07-05T15:43:25.103473+08:00",
"process_begintime": "2022-07-05T15:43:25.103473+08:00",
"Topic": "OC-NoticeV2-OrderBroadcast",
"RpcID": "0.1.9.1.11.9.2.1.1.44.19",
"C_Task_ReceiveTime": "2022-07-05T15:43:25.103473+08:00",
"NowRpcID": "0.1.9.1.11.9.2.1.1.44.19.1",
"Sender": "T-MAB5-NCB-03",
"MessageCenterInstaceID": "MA.B4.ACTC.04.MAC",
"Store": "ASCore-NewEWalletChargingOrderEndProFrozenEventProcessor"
},
"RetryCount": 1,
"CurrentRetryCount": 0,
"Status": 1
}
自定义重试规则,设置对应的重试时间,根据消息体中的C_CurrentRetryCount字段来判断为第几次重试,从而达到延时多久进行处理的目的
public void Execute(TaskDataMap content)
{
if (content.Contains("C_CurrentRetryCount"))
{
int retryCount = content.GetIntValue("C_CurrentRetryCount");
if(retryCount == 0)
{
Result = false;
return;
}
}
}
常见问题排查步骤
Q:我发送了一个事件或者消息,消费方反馈没有消费到
A:核心思路:事件连接串是否正确,消费方是否成功绑定了我的事件,消费方的Queue是否有消费者,查看相应的MAC上是否有报错信息,重启大法
事件连接串是否正确
查看DDP中的“队列连接配置项”是否是正确的MQ集群地址
登录相应的MQ集群的管理控制平台,在Exchange的Tab页中查看是否有相应的Exchange,如果存在,证明发送方没有问题
查看相应的数据库中是否存在影响的元数据信息(测试环境可以查询公共技术的TeldMaster库中的eventdefinition表)
消费方是否成功绑定了我的事件
在相应的Exchange下面查找消费的绑定,如果存在,证明绑定没有问题,需要看是否有相应的消费者
如果未能找到相应的绑定,需要查看消费方的异步任务配置的“队列连接配置项”是否与事件配置相同,如果相同但是没有绑定,需要看相应的MAC是否已经启动,是否正确加载了任务
查看相应的数据库中是否存在影响的元数据信息(测试环境可以查询公共技术的TeldMaster库中的TaskMetadata表)
消费方的Queue是否有消费者
找到对应的Queue,查看消费者列表,如果有,需要查看相应的MAC上是否有报错信息
如果没有消费者,有两种可能:消费者所在的MAC进程没有启动或者任务的注册信息有问题,MAC启动但是没有加载到相应的任务(查看任务的元数据)
查看相应的MAC上是否有报错信息
如果MAC中调用了HSF等,可以查看相应的HSF是否有执行,执行是否报错
如果未执行到HSF,在测试环境可以打开MAC进程中的输出,观察是否有错误报出,一般会有DLL未加载,Model不匹配等导致接收消息就报错了。
重启大法
如果是事件,重启发送方的进程
如果是消息,使用HSF方式发送的重启TTP的TSD的Host
如果是消息,使用SDK方式发送的重启发送方的进程