专注于快乐的事情

MQ简单学习

本文于925天之前发表,文中内容可能已经过时。

MQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。。

#JMS规范#

JMS(Java Message Service) 即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。
它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。

当采用点对点模型时,消息将发送到一个队列,该队列的消息只能被一个消费者消费。

而采用发布订阅模型时,消息可以被多个消费者消费。
在发布订阅模型中,生产者和消费者完全独立,不需要感知对方的存在。

消息如何从producer端达到consumer端由message-routing来决定。在JMS中,消息路由非常简单,由producer和consumer链接到同一个queue(p2p)或者topic(pub/sub)来实现消息的路由。JMSconsumer同时支持message selector(消息选择器),通过消息选择器,consumer可以只消费那些通过了selector筛选的消息。在JMS中,消息路由机制的图示如下:

type

几个重要概念

Destination:消息发送的目的地,也就是前面说的Queue和Topic。创建好一个消息之后,只需要把这个消息发送到目的地,消息的发送者就可以继续做自己的事情,而不用等待消息被处理完成。至于这个消息什么时候,会被哪个消费者消费,完全取决于消息的接受者。

Message:从字面上就可以看出是被发送的消息。它有下面几种类型:
StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。
MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。
TextMessage:普通字符串消息,包含一个String。
ObjectMessage:对象消息,包含一个可序列化的Java 对象
BytesMessage:二进制数组消息,包含一个byte[]。
XMLMessage: 一个XML类型的消息。
最常用的是TextMessage和ObjectMessage。

   Session:与JMS提供者所建立的会话,通过Session我们才可以创建一个Message。 
   Connection:与JMS提供者建立的一个连接。可以从这个连接创建一个会话,即Session。 
ConnectionFactory:那如何创建一个Connection呢?这就需要下面讲到的ConnectionFactory了。通过这个工厂类就可以得到一个与JMS提供者的连接,即Conection。 
   Producer:消息的生产者,要发送一个消息,必须通过这个生产者来发送。 
   MessageConsumer:与生产者相对应,这是消息的消费者或接收者,通过它来接收一个消息。

收发消息的对象创建过程

MS规范中,收发消息的对象创建过程如下:

1. 初始化ConnetionFactory

2. ConnetionFactory创建Connection

3. Connection创建Session

4. Session创建Destination(包括Queue 和 Topic两种)

5.发: Session创建消息生产者MessageProducer(收:Session创建消息消费者MessageConsumer)

6.Seesion创建Message,(发:)MessageProducer发送到Destination,(收:)MessageConsumer从Destination接受消息。

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试
  11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。

###优点

是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
缺点:ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。

###安装和启动

输入activemq.bat start,启动AMQ。

AMQ的默认使用的TCP连接端口是61616,可以通过命令 netstat -an|find "61616"来测试是否启动。
输入ctrl + c 或者使用activemq stop命令,停止AMQ。

admin工程是管理控制台

管理控制台地址:http://localhost:8161/admin 用户名密码都是admin

demo地址:http://localhost:8161/demo 默认不启动,需要在jetty中配置

###管理端说明

http://lhbthanks.iteye.com/blog/1940767

Number Of Consumers 消费者 这个是消费者端的消费者数量

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

ActiveMQ应用场景

1、 不同语言应用集成
ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。

2、 作为RPC的替代
使用RPC同步调用的应用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,但是转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。使用同步请求的系统在规模上有较大的限制,因为请求会被阻塞,从而导致整个系统变慢。如果使用异步消息替代,可以很容易增加额外的消息接收者,使得消息能被并发消耗,从而加快请求处理。当然,你的系统应用间应该是解耦的。

3、 应用之间解耦
正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。不仅可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。相比使用同步的系统(调用者必须等待被调用者返回信息),异步系统(调用方发送消息后就不管,即fire-and-forget)能够给我们带来事件驱动架构(event-driven architecture EDA)。

4、 作为事件驱动架构的主干
解耦,异步架构的系统允许通过代理器自己配置更多的客户端,内存等(即vertical scalability)来扩大系统,而不是增加更多的代理器(即horizontal scalability)。考虑如亚马逊这样繁忙的电子商务系统。当用户购买物品,事实上系统需要很多步骤去处理,包括下单,创建发票,付款,执行订单,运输等。但是用户下单后,会立即返回“谢谢你下单”的界面。不只是没有延迟,而且用户还会受到一封邮件表明订单已经收到。在亚马逊下单的例子就是一个多步处理的例子。每一步都由单独的服务去处理。当用户下单是,有一个同步的体积表单动作,但整个处理流程并不通过浏览器同步处理。相反地,订单马上被接受和反馈。而剩下的步骤就通过异步处理。如果在处理过程中出错,用户会通过邮件收到通知。这样的异步处理能提供高负载和高可用性。

5、 提高系统扩展性
很多使用事件驱动设计的系统是为了获得高可扩展性,例如电子商务,政府,制造业,线上游戏等。通过异步消息分开商业处理步骤给各个应用,能够带来很多可能性。考虑设计一个应用来完成一项特殊的任务。这就是面向服务的架构(service-oriented architecture SOA)。每一个服务完成一个功能并且只有一个功能。应用就通过服务组合起来,服务间使用异步消息和最终一致性。这样的设计便可以引入一个复杂事件处理概念(complex event processing CEP)。使用CEP,部件间的交互可以被记录追踪。在异步消息系统中,可以很容易在部件间增加一层处理。

http://shmilyaw-hotmail-com.iteye.com/blog/1897635

###其他注意的地方
接收和处理消息的方法有两种,分为同步和异步的,一般同步的方式我们是通过MessageConsumer.receive()方法来处理接收到的消息。而异步的方法则是通过注册一个MessageListener的方法,使用MessageConsumer.setMessageListener()。

RabbitMQ

http://www.cnblogs.com/leocook/p/mq_rabbitmq_0.html

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。他遵循Mozilla Public License开源协议。

###安装和启动###

wget http://www.erlang.org/download/otp_src_17.3.tar.gz

tar zxvf otp_src_17.3.tar.gz

./configure --prefix=/home/ww/erlang

sudo aptitude install libncurses5-dev

make  make install

tar zxvf rabbitmq-server-generic-unix-3.3.5.tar.gz

export PATH=$PATH:/usr/rabbitmq_server-3.3.5/sbin

./rabbitmq-server start

abbitmq web管理页面插件安装

./rabbitmq-plugins enable rabbitmq_management

之后,netstat -napt|grep 5672 就会看到

tcp 0 0 0.0.0.0:15672 0.0.0.0: LISTEN 2506/beam.smp
tcp 0 0 0.0.0.0:55672 0.0.0.0:
LISTEN 2506/beam.smp
tcp 0 0 :::5672 :::* LISTEN 2506/beam.smp

通过 http://127.0.0.1:15672,和guest:guest的用户名密码就能登录管理页面了

流程图 

type

RabbitMQ的消息发送模型核心思想是生产者不直接把消息发送到消息队列中

几个概念

Broker:即消息队列服务器实体

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

producer:消息生产者,就是投递消息的程序。

consumer:消息消费者,就是接受消息的程序。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

### 工作过程

生产者客户端:

  1. 客户端连接到RabbitMQ服务器上,打开一个消息通道(channel);
  2. 客户端声明一个消息交换机(exchange),并设置相关属性。
  3. 客户端声明一个消息队列(queue),并设置相关属性。
  4. 客户端使用routing key在消息交换机(exchange)和消息队列(queue)中建立好绑定关系。
  5. 客户端投递消息都消息交换机(exchange)上
  6. 客户端关闭消息通道(channel)以及和服务器的连接。

服务器端:

exchange接收到消息后,根据消息的key和以及设置的binding,进行消息路由,将消息投递到一个或多个消息队列中。

exchange类型

(1). Direct交换机:完全根据key进行投递。例如,绑定时设置了routing key为abc,客户端提交信息提交信息时只有设置了key为abc的才会投递到队列;

(2).Topic交换机:在key进行模式匹配后进行投递。例如:符号”#”匹配一个或多个字符,符号””匹配一串连续的字母字符,例如”abc.#”可以匹配”abc.def.ghi”,而”abc.”只可以匹配”abc.def”。

(3).Fanout交换机:它采取广播模式,消息进来时,将会被投递到与改交换机绑定的所有队列中。

RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型

RabbitMQ的消息持久化

RabbitMQ支持数据持久化,也就是把数据写在磁盘上,可以增加数据的安全性。消息队列持久化包括三个部分:

消息交换机(exchange)持久化,在声明时指定durable为1
消息队列(queue)持久化,在声明时指定durable为1
消息持久化,在投递时指定delivery_mode为2(1是非持久化)
如果消息交换机(exchange)和消息队列(queue)都是持久化的话,那么他们之间的绑定(Binding)也是持久化的。如果消息交换机和消息队列之间一个持久化、一个非持久化,那么就不允许绑定。

其他

在向消费者推送某条消息后,RabbitMQ会立刻删除掉这条消息。这样的话,如果我们kill掉某个worker的话,那么我们将会流失掉该worker正在处理任务的消息(改任务未处理完成),我们也会丢失所有被发送到这个消费者且未处理完成的消息。

但是,我们不想丢失这部分消息,我们希望这类消息可以再次被发送到其它worker那。

为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。

RabbitMQ接收到消息后,首先会把该消息写到内存缓冲区中,并不是直接把单条消息实时写到磁盘上的。消息的持久化不是健壮的,但是对于简单的任务队列是够用了。如果你需要一套很健壮的持久化方案,那么你可以使用publisher confirms

使用消息应答机制和prefetchCount可以实现一个工作队列了。持久化的选项可以使任务即使队列和消息即使在RabbitMQ重启后,依然不会丢失。

总结:

  1. 消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

  2. 服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会。

  3. 指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

### 负载均衡 ###

实现——一个已经命名的队列里发送和接收消息

实现——工作队列

type

使用Work Queue,rabbitMQ把每条任务消息只发给一个消费者。

工作队列的主要思想就是避开立刻处理某个资源消耗交大的任务并且需要等待它执行完成。取而代之的是我们可以将它加入计划列表,并在后边执行这些任务。我们将任务分装成一个消息,并发送到队列中。后台的工作程序在接收到消息后将会立刻执行任务。当运行多个执行器时,任务将会在他们之间共享。

这个概念在web应用程序中是比较实用的,对于一些在一个短的http请求里无法完成的复杂任务。

channel执行basicConsume方法时autoAck为false,这就意味着接受者在收到消息后需要主动通知RabbitMQ才能将该消息从队列中删除,否则该在接收者跟MQ连接没断的情况下,消息将会变为untracked状态,一旦接收者断开连接,消息重新变为ready状态。

RabbitMQ不允许你重新定义一个已经存在的消息队列,如果你尝试着去修改它的某些属性的话,那么你的程序将会报错。所以,这里你需要更换一个消息队列名称。

标记消息持久化并不能百分百的保证消息一定不会被丢失,虽然RabbitMQ会把消息写到磁盘上,但是从RabbitMQ接收到消息到写到磁盘上,这个短时间的过程中发生的RabbitMQ重启依然会使得为写入到磁盘的消息被丢失。事实上是这样的,RabbitMQ接收到消息后,首先会把该消息写到内存缓冲区中,并不是直接把单条消息实时写到磁盘上的。消息的持久化不是健壮的,但是对于简单的任务队列是够用了。如果你需要一套很健壮的持久化方案,那么你可以使用publisher confirms(稍后会更新详细的使用方法)。

发布/订阅 Publish/Subscribe

把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅)

无论什么时候我们和RabbitMQ建立连接时,我们都要刷新、清空Queue。为了达到这一的目的,我们可以用一个随机的名字(随机性可由自己来定义)来创建Queue,也可以让服务器来自动建立一个随见的Queue。
当消费者断开连接时,Queue能自动被删除。
使用Java客户端时,我们使用无参数的queueDeclare方法,就可以创建一个已经生成名字的、排他性的且会自动删除的Queue:

String queueName = channel.queueDeclare().getQueue();

消费者端怎么才能拿到生产者发送消息中的部分消息?

一个感兴趣的日志级别进行绑定

更复杂一点的使用Topic类型的exchange

用RabbitMQ来构建一个RPC系统

RPC工作原理如下:

当Client启动时,它将会创建一个匿名的callback queue。
对于一次RPC请求,client会发送一条含有两个属性的消息:replyTo和correlationId。Reply是设置的callback queue,correlationId是设置的当前请求的标示符。
请求将会被发送到rpc_queue里。
RPC的worker(RPC server)等待queue中的请求。当出现一个请求之后,他将会处理任务,并向replyTo队列中发送消息。
客户端会等待callback queue上的消息。当消息出现时,它将会检查correlationId属性是否能与之前发送请求时的属性一直,若一致的话,client将会处理回复的消息。

RocketMQ

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

  1. 支持严格的消息顺序;
  2. 支持Topic与Queue两种模式;
  3. 亿级消息堆积能力;
  4. 比较友好的分布式特性;
  5. 同时支持Push与Pull方式消费消息;

Kafka

总结

参考文献

评论系统未开启,无法评论!