死信队列实现延迟 (死信队列实现第三方支付接口回调)
在软件开发中,死信队列是一种重要的消息处理机制,用于处理一些不能被正常消费的消息。通过死信队列,我们可以实现消息的延迟处理,这在某些场景下尤为重要,比如在第三方支付接口回调中。
让我们理解一下什么是死信队列。在消息队列中,当一条消息无法被正常消费时,可能会被标记为“死信”,也就是说这条消息无法被正确处理。这可能是因为消费者处理消息的时候抛出了异常,或者是由于消息格式错误等各种原因。为了处理这些无法正常消费的消息,我们引入了死信队列这个概念。
死信队列其实就是一个专门用来存放死信消息的队列,当一条消息被标记为死信时,会被发送到死信队列中,进而可以被系统进行处理。通常情况下,我们会设置一个延迟时间,在消息被标记为死信后一段时间内,系统会尝试重新处理这条消息,如果失败,则将其发送到死信队列中。
在第三方支付接口回调的场景下,我们经常会遇到一些需要延迟处理的情况。比如用户下单后,支付接口会异步回调通知我们支付结果,但有时由于网络等原因,这个通知可能会延迟到达,或者是通知内容不完整,无法直接处理。这时,我们就可以借助死信队列来实现对这些支付回调消息的延迟处理。
具体实现死信队列实现第三方支付接口回调的过程如下:
1. 我们在消息队列中设置一个专门用来接收第三方支付接口回调通知的队列,比如叫做"payment_callback_queue"。
2. 当第三方支付接口回调通知到达时,我们将其放入这个队列中,并设置一个合理的超时时间,比如30秒。
3. 如果在超时时间内消息被正常消费完毕,那么处理流程结束。但如果超时时间到达后消息仍未被处理,那么这条消息就被标记为死信。
4. 接着,我们设置一个死信交换机和死信队列,将这些死信消息发送到这个队列中。
5. 在死信队列中,我们可以设置一个延迟处理的策略,比如间隔10分钟重新处理一次,直到成功为止。
通过以上步骤,我们就实现了对第三方支付接口回调的延迟处理。这样一来,即使因为各种原因导致支付回调通知延迟到达或者无法被正确处理,我们也能够通过死信队列来保证最终的处理结果。
死信队列是一种非常有用的消息处理机制,在处理一些特殊情况下的消息时能够发挥很大的作用。通过合理地设置死信队列,我们可以实现对第三方支付接口回调等场景下的消息延迟处理,保障系统的稳定性和可靠性。
RabbitMQ实现延迟消费(延迟队列)
首先在pom中加入依赖
然后配置yml文件
创建业务队列与死信队列
该消费者是消费死信队列中的消息
启动服务之后,可以看到创建的交换机和队列
消息过期之后从 prod_queue_pay 队列转发到 dl-queue 队列。很好的实现了消息延迟消费。但我们会发现一个问题,通过给队列属性设置过期时间,如果我现在有不同的场景,比如我5s、10s、15s之后延迟消费,那需要创建三个队列。每次有一个不同的时间段的需求过来,我都需要创建一个队列,这肯定不行。
快速入口:
发送消息的时候通过在header添加x-delay参数来控制消息的延时时间
启动服务,登录RabbitMQ管理界面,可以看到交换机和队列都已经创建成功。
该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。这些判断和操作导致效率不如普通的Exchange,所以如果不需要的话,就不要用插件类型的延迟队列。
网易传媒技术团队:消息中间件实现延迟队列的应用与实践
早期需要延迟处理的业务场景,更多的是通过定时任务扫表,然后执行满足条件的记录,具有频率高、命中低、资源消耗大的缺点。随着消息中间件的普及,延迟消息可以很好的处理这种场景,本文主要介绍延迟消息的使用场景以及基于常见的消息中间件如何实现延迟队列,最后给出了一个在网易公开课使用延迟队列的实践。
1、有效期:限时活动、拼团。。。
2、超时处理:取消超时未支付订单、超时自动确认收货。。。
4、重试:网络异常重试、打车派单、依赖条件未满足重试。。。
5、定时任务:智能设备定时启动。。。
1、RabbitMQ
1)简介:基于AMQP协议,使用Erlang编写,实现了一个Broker框架
a、Broker:接收和分发消息的代理服务器
b、Virtual Host:虚拟主机之间相互隔离,可理解为一个虚拟主机对应一个消息服务
c、Exchange:交换机,消息发送到指定虚拟机的交换机上
d、Binding:交换机与队列绑定,并通过路由策略和routingKey将消息投递到一个或多个队列中
e、Queue:存放消息的队列,FIFO,可持久化
f、Channel:信道,消费者通过信道消费消息,一个TCP连接上可同时创建成百上千个信道,作为消息隔离
2)延迟队列实现:RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现
a、TTL:RabbitMQ支持对队列和消息各自设置存活时间,取二者中较小的值,即队列无消费者连接或消息在队列中一直未被消费的过期时间
b、DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息
3)缺点:
a、配置麻烦,额外增加一个死信交换机和一个死信队列的配置
b、脆弱性,配置错误或者生产者消费者连接的队列错误都有可能造成延迟失效
2、RocketMQ
a、Broker:存放Topic并根据读取Producer的提交日志,将逻辑上的一个Topic分多个Queue存储,每个Queue上存储消息在提交日志上的位置
b、Name Server:无状态的节点,维护Topic与Broker的对应关系以及Broker的主从关系
2)延迟队列实现:RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中),然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中
3)缺点:延迟时间粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
2)延迟队列实现:Kafka支持延时生产、延时拉取、延时删除等,其基于时间轮和JDK的DelayQueue实现
a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表
b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项
c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask
d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,类似于钟表就是一个三级时间轮
e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执行相应任务列表外,层级时间轮也会进行相应调整
3)缺点:
a、延迟精度取决于时间格设置
b、延迟任务除由超时触发还可能被外部事件触发而执行
4、ActiveMQ
1)简介:基于JMS协议,Java编写的Apache顶级开源项目,支持点对点和发布订阅两种模式。
a、点对点(point-to-point):消息发送到指定的队列,每条消息只有一个消费者能够消费,基于拉模型
b、发布订阅(publish/subscribe):消息发送到主题Topic上,每条消息会被订阅该Topic的所有消费者各自消费,基于推模型
2)延迟队列实现:需要延迟的消息会先存储在JobStore中,通过异步线程任务JobScheduler将到达投递时间的消息投递到相应队列上
a、Broker Filter:Broker中定义了一系列BrokerFilter的子类构成拦截器链,按顺序对消息进行相应处理
b、ScheduleBroker:当消息中指定了延迟相关属性,并且jobId为空时,会生成调度任务存储到JobStore中,此时消息不会进入到队列
c、JobStore:基于BTree存储,key为任务执行的时间戳,value为该时间戳下需要执行的任务列表
d、JobScheduler:取JobStore中最小的key执行(调度时间最早的),执行时间<=当前时间,将该任务列表依次投递到所属的队列,对于需要重复投递和投递失败的会再次存入JobStore中。
注: 此处JobScheduler的执行时间间隔可动态变化,默认0.5s,有新任务时会立即执行(Object->notifyAll())并设置时间间隔为0.1s,没有新任务后,下次执行时间为最近任务的调度执行时间。
3)缺点:投递到队列失败,将消息重新存入JobStore,消息调度执行时间=系统当前时间+延迟时间,会导致消息被真实投递的时间可能为设置的延迟时间的整数倍
1)简介:基于Key-Value的NoSQL数据库,由于其极高的性能常被当作缓存来使用,其数据结构支持:字符串、哈希、列表、集合、有序集合
2)延迟队列实现:Redis的延迟队列基于有序集合,score为执行时间戳,value为任务实体或任务实体引用
3)缺点:
a、实现复杂,本身不支持
b、完全基于内存,延迟时间长浪费内存资源
6、消息队列对比
1、公开课延迟队列技术选型
1)业务场景:关闭超时未支付订单、限时优惠活动、拼团
2)性能要求:订单、活动、拼团 数据量可控,上述MQ均能满足要求
3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作为延迟队列更普遍
4)可用性:ActiveMQ、RocketMQ自身支持延迟队列功能,且目前公开课业务中使用的中间件为ActiveMQ和Kafka
5)延迟时间灵活:活动的开始和结束时间比较灵活,而RocketMQ时间粒度较粗,Kafka会依赖时间格有精度缺失
结论: 最终选择ActiveMQ来作为延迟队列
2、业务场景:关闭未支付订单
1)关闭微信未支付订单
2)关闭IOS未支付订单
3、ActiveMQ使用方式
1)中支持调度任务
2)发送消息时,设置message的延迟属性
其中:
a、延迟处理
AMQ_SCHEDULED_DELAY:设置多长时间后,投递给消费者(毫秒)
b、重复投递
AMQ_SCHEDULED_PERIOD:重复投递时间间隔(毫秒)
AMQ_SCHEDULED_REPEAT:重复投递次数
c、指定调度计划
AMQ_SCHEDULED_CRON:corn正则表达式
4、公开课使用中进行的优化
1)可靠性:针对实际投递时间可能翻倍的问题,结合ActiveMQ的重复投递,在消费者逻辑中做幂等处理来保证延迟时间的准确性
2)可追溯性:延迟消息及消费情况做数据库冗余存储
3)易用性:业务上定义好延迟枚举类型,直接使用JmsDelayTemplate发送,无需关心数据备份和参数等细节
1、无论是基于死信队列还是基于数据先存储后投递,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,从而降低耦合度
2、无论是检查队头消息TTL还是调度存储的延迟数据,本质上都是通过定时任务来完成的,但是定时任务的触发策略以及延迟数据的存储方式决定了不同中间件之间的性能优劣
张浩,2018年加入网易传媒,高级Java开发工程师,目前在网易公开课主要做支付财务体系、版本迭代相关的工作。
Spring AMQP杂记之Spring实现简述
上一篇主要介绍了AMQP的一些知识,接下来开始正式步入Spring AMQP。 Message:在AMQP中并没有定义消息的模型,Spring为了方便我们理解与使用,新增了Message接口,在构建消息的时候Spring提供了builde API,的形式使用起来很方便。 Exchange:这个接口和AMQP中定义的exchange基本相同,就不说了 Queue:同上。 Binding:一般叫他绑定关系,AMQP也有对其的抽象模型,只不过我认为他只不过相当于是附加在队列与交换机上的属性,所以在上篇关于AMQP的介绍中并没有详细说明。 呃,其实spring对其的定义就是代表了队列与交换机的绑定关系。 。 。 spring提供了ConnectionFactory接口,当我们使用的时候会使用它的实现类CachingConnectionFactory,看名字也知道就是基于缓存的连接池,默认的池大小为25。 Spring也提供了对于多个connectionFactory的支持接口例如SimpleRoutingConnectionFactory等。 我们使用SpringBoot进行测试,最小化的配置如下 这里先给出一个简单的例子然后再具体讲解。 如图,我们提前声明了一个名为hello的队列,浏览器访问/send时,可以看到控制台打印了相应的时间信息,即被@RabbitListener注解的方法被调用了。 如果我们打开RabbitMq的webUI,会发现名为hello的队列中消息数量由0变为1再变为0。 注意,这里我们并没有声明Exchange,MQ会为我们将队列绑定到默认的Exchange。 接下来就详细的说一下这个例子。 对于操作RabbitMQ,Spring提供了 RabbitTemplate(对于batch操作,相应的是BatchingRabbitTemplate,在1.6版本以后,spring提供了异步的Template--AsyncRabbitTemplate)。 我们使用它来发送与接收消息。 当发送完消息的时候如何知道本次操作的成功或者失败呢?默认情况下不能被路由的消息将会被丢弃,这会导致消息丢失,不能保证消息可靠性(消息可靠性请参照上一篇AMQP介绍中的推荐)。 发布确认机制是保证消息可靠性的第一步,发布确认保证我们知道消息是否成功到达队列中,返回ack则代表成功,nack则代表失败。 要使用这个特性,我们需要将RabbitTemplate的mandatory属性和ConnectionFactory的publisherConfirms属性都设为true。 这时我们可以在RabbitTemplate上设置setReturnCallback监听来接收MQ服务器返回的状态信息了。 对于消息的确认,我们只需要设置的回调方法即可。 当我们每次发送请求时,都会打印相应的ack,其中correlationData是生产者在发送数据时可以携带的相关信息。 这里有个问题需要注意一下,RabbitTemplate只允许设置一个callback方法,这时你可以将RabbitTemplate的bean设为单例然后设置回调。 这样的缺点是所有使用这个template的地方都会使用这个回调,那么当我们想要为不同的操作定制callBack该怎么做?如果直接在别的地方继续设置会报Only one ConfirmCallback is supported by each RabbitTemplate异常,这时候我们就需要将RabbitTemplate的作用域设为@Scope,这样每个bean都是一个新的。 难道这样就可以了么?我们的service类一般都是单例的,这意味着当service类生成后,注入的RabbitTemplate就已经不变了,这个就是Single域的bean中注入Scope域bean的问题。 一种解决方法是实现ApplicationAware接口注入ApplicationContext,每次使用RabbitTemplate时调用其getBean方法。 一个更好的解决方案是使用spring提供的lookup方法。 spring会帮我们代理lookup注解的方法,每次调用都会返回一个全新的bean。 但其实平常使用一般都会将发送方单独抽取出来实现回调接口,不会涉及上面的问题,一般都如下配置,注意将template配置成scope即可。 RabbitTemplate可以添加消息转换器,作用就类似于mvc中配置的@ResponseBody消息转换器。 具体如何发送与接收消息感觉不用咋说了。 。 。 就send,receive(x,x,x)这个用IDE看一下方法doc就知道咋用了。 receive为拉模式,很少使用,关于接收方法我们更常使用的是异步接收,即推模式,一般使用@RabbitListener 实现 当hello队列中有消息时,方法会自动调用。 像我们平常做web开发,前端想要接受来自后台的消息无非俩个方法,前台请求和后台推送,前台轮询一般就是ajax定时器,推送一般使用WebSocket实现,MQ同样有两种模式:轮询请求队列看是否有消息即拉模式,队列中有消息即对消费者进行通知即推模式。 对于拉模式,Spring提供了receive,receiveAndConvert,和receiveAndReply方法。 接收并回复的方法很有用,比如订单系统,下单消息被MQ处理完后再返回消息给其他队列,告诉她这个订单已经完成,可以进行付费操作了。 接收并回复调用实现自己的接收回调。 对于推模式,项目中基本上使用@RabbitListener注解完成,该注解结合@SendTo注解完成receiveAndReply功能,若没有sendto,这个方法是不允许有返回值的。 对于异常情况,配置@RabbitListener的errorHandler和returnExceptions即可。 关于@RabbitListener注解的具体使用其实也挺复杂的,推荐直接看文档。 使用监听器的过程中消息是默认经过消息转换器的,可以手动为其设置消息转换器。 关于RabbitMQ LIstener的配置可以使用Config方式或者SpringBoot的配置文件方式。 上面只是官方文档的一部分,其实除了Listener大部分Config方式的配置都可以用配置文件方式替代。 声明队列与交换机:分为xml方式和Java Config方式(懒得写了,这个基本官网就是复制粘贴) 配置Broker:Spring对其的抽象为RabbitAdmin,也是官网。 。 延时队列实现:设置交换机延时属性为true,通过convertAndSend中的MessagePostProcessor实现发送延时消息,这个方法需要安装延时交换机这样的一个插件(也可以通过死信队列实现) 好了。 今天就先写这么多,因为实在是写的太乱了,以后有时间整理一下。 。 。
若对本页面资源感兴趣,请点击下方或右方图片,注册登录后
搜索本页相关的【资源名】【软件名】【功能词】或有关的关键词,即可找到您想要的资源
如有其他疑问,请咨询右下角【在线客服】,谢谢支持!
相关文章
- 随时随地进行计算,在线万能计算器让您的数学问题迎刃而解 (随时随地进行教育)
- 学生、专业人士和数学爱好者的理想选择:在线万能计算器 (专业的学生)
- 功能强大的在线万能计算器,满足您的各种计算需求 (功能强大在线海报图片设计器 图片编辑器素材设计源码)
- 快速准确的计算:在线万能计算器提高您的效率 (快速准确的计算公式)
- 告别复杂计算!在线万能计算器让数学变得简单 (复杂的计算比简单的计算更容易出错)
- 在线万能计算器:免费又便捷的数学帮手 (在线万能计算器)
- 万能计算器:随时随地进行计算,省时省力 (万能计算器)
- 在线万用计算器:轻松解决您的数学难题 (在线万用计算器app)
- 同城游,让城市成为你的第二故乡,开启你的城市探索之旅 (同城游同城游)
- 同城游,用最优惠的价格,体验城市最棒的事物 (同城游 app)
发表评论
评论列表
- 这篇文章还没有收到评论,赶紧来抢沙发吧~