概念
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。
优点:
- 通过异步处理提高系统性能(减少响应所需时间)
- 削峰/限流(将处理压力转入MQ,提高接口并发)
- 降低系统耦合性(利用发布-订阅模式工作)
带来的新问题:可用性降低、复杂性提高、一致性问题
JMS
JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。
JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
消息模型
- 点到点(P2P)模型
- 发布/订阅(Pub/Sub)模型
AMQP
AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。
消息模型
- direct exchange
- fanout exchange
- topic change
- headers exchange
- system exchange
JMS对比AMQP
- AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。
- 在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。
而 AMQP 天然具有跨平台、跨语言特性。 - JMS 支持 TextMessage、MapMessage 等复杂的消息类型;
而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。 - 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。
常见的消息队列对比
- 吞吐量:RocketMQ / Kafka > ActiveMQ / RabbitMQ
- 可用性:ActiveMQ / RabbitMQ基于主从;RocketMQ / Kafka基于分布式,一个数据多个副本
- 时效性:RabbitMQ并发性能最好,延迟最低
- 功能支持:kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
- 消息丢失:ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。
如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题
RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ
常见问题
幂等性和可靠性
主要是指消息的重复消费和确认消费,这里把使用ws/mqtt协议所传输的消息也纳入其中
我们使用redis建立一个已消费消息队列和未消费消息队列来解决这个问题:
当服务端的代理(RabbitMQ、Kafka、nettySever等)发送一条消息后,将该消息存入redis(消息ID为键,内容为value)
等待消费者消费过后向后端固定接口返回消息ID,(如果是客户端建议在客户端中再缓存一遍,如果是服务端就远程调用,当然也建议本地缓存一份,到一定容量时淘汰最久的一个)该接口会将redis中未消费的这条消息删除放入已消费中,key是ID,value为null
后端的启用定时任务定时重新发送未消费队列中的消息
客户端消费时会去判断redis中是否已消费过防止重复消费。
- 对于redis的set是天然幂等性,不用处理
- 对于mysql的插入操作,可由在插入前判断主键,如果存在就update;或者给字段加上唯一索引,使插入报错
消息挤压
- 对于rabbitMQ可以多建立一些消费者(其实就是多开一些线程)去消费,至于数量结合服务器cpu的一个使用情况;接口的限流处理;对于rabbitMQ还可以在后台管理界面查看
ready
数量,ready
如果处于递增状态,就说明消息开始挤压了,需要做接口的限流了。