联系我们

公司地址: 上海市沪宜公路1188号4号楼
     一层
联系电话:021-31080981
电子邮箱:soline@soline.com.cn
邮政编码:201802

消息中间件解决方案JMS

1、什么是消息中间件

        消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)

    常见的消息中间件产品

     (1)ActiveMQ
        ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。    
    (2)RabbitMQ
        AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
    (3)ZeroMQ
        史上最快的消息队列系统    
    (4)Kafka
        Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

2、JMS简介

    2.1、什么是JMS    

        JMS(Java Messaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
       JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
    JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供
现有消息格式的一些级别的兼容性。
· TextMessage--一个字符串对象
· MapMessage--一套名称-值对
· ObjectMessage--一个序列化的 Java 对象
· BytesMessage--一个字节的数据流
· StreamMessage -- Java 原始值的数据流

2.2、JMS消息传递类型

    对于消息的传递有两种类型

        一种是点对点的,即一个生产者和一个消费者一一对应。

    


            另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进
        行接收。
     


3、ActiveMQ下载与安装

    3.1、官方网站下载

            官方网站下载:http://activemq.apache.org/

    3.2、安装(Linux)

    (1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器

    (2)解压此文件

tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
    (3)为apache-activemq-5.12.0目录赋权
chmod 777 apache-activemq-5.12.0

    (4)进入apache-activemq-5.12.0\bin目录

    (5)赋与执行权限       

    (6)启动

 ./activemq start

        出现下列提示表示成功!

     

假设服务器地址为192.168.25.135 ,打开浏览器输入地址

            http://192.168.25.135:8161/ 即可进入ActiveMQ管理页面

     

    点击进入管理页面

     

    输入用户名和密码  均为 admin

    

    进入主界面

        

    点对点消息队列

     

    列表各列含义

    Number Of Pending Messages  :等待消费的消息 这个是当前未出队列的数量。
    Number Of Consumers  :消费者 这个是消费者端的消费者数量
    Messages Enqueued  :进入队列的消息  进入队列的总数量,包括出队列的。
    Messages Dequeued  :出了队列的消息  可以理解为是消费这消费掉的数量。

4、JMS小Demo

    4.1、点对点模式    

    点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息。

    4.2、消息生产者       

    (1)创建工程jmsDemo ,引入依赖
  	<dependency>		<groupId>org.apache.activemq</groupId>		<artifactId>activemq-client</artifactId>		<version>5.13.4</version>	</dependency>

    (2)创建类QueueProducer  main方法代码如下:

	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			//5.创建队列对象	Queue queue = session.createQueue("test-queue");	//6.创建消息生产者	MessageProducer producer = session.createProducer(queue);	//7.创建消息	TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");	//8.发送消息	producer.send(textMessage);	//9.关闭资源	producer.close();	session.close();	connection.close();
    上述代码中第4步创建session  的两个参数:
        第1个参数 是否使用事务
        第2个参数 消息的确认模式
        • AUTO_ACKNOWLEDGE = 1    自动确认
        • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
        • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
        • SESSION_TRANSACTED = 0    事务提交并确认
        运行后通过ActiveMQ管理界面查询    
    4.3、消息消费者  


        创建类QueueConsumer ,main方法代码如下:
	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建队列对象	Queue queue = session.createQueue("test-queue");	//6.创建消息消费	MessageConsumer consumer = session.createConsumer(queue);		//7.监听消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});		//8.等待键盘输入	System.in.read();		//9.关闭资源	consumer.close();	session.close();	connection.close();	
    执行后看到控制台输出

         

    运行测试

        同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。

    5、发布/订阅模式

        5.1、消息生产者 

        创建类TopicProducer ,main方法代码如下:
        //1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建主题对象	Topic topic = session.createTopic("test-topic");	//6.创建消息生产者	MessageProducer producer = session.createProducer(topic);	//7.创建消息	TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");	//8.发送消息	producer.send(textMessage);	//9.关闭资源	producer.close();	session.close();	connection.close();

    运行效果    

    5.2、消息消费者    

    创建类TopicConsumer ,main方法代码如下:
	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建主题对象	//Queue queue = session.createQueue("test-queue");	Topic topic = session.createTopic("test-topic");	//6.创建消息消费	MessageConsumer consumer = session.createConsumer(topic);		//7.监听消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});	//8.等待键盘输入	System.in.read();	//9.关闭资源	consumer.close();	session.close();	connection.close();	

5.3、运行测试        

            同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。

6、Spring整合JMS

    6.1、点对点模式

          消息生产者

    (1)创建工程springjms_producer,在POM文件中引入SpringJms 、activeMQ以及单元测试相关依赖  
    (2)在src/main/resources下创建spring配置文件applicationContext-jms-producer.xml

	<context:component-scan base-package="cn.itcast.demo"></context:component-scan><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean>	   <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">	<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->	<property name="connectionFactory" ref="connectionFactory"/>	</bean><!--这个是队列目的地,点对点的  文本信息-->	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">	<constructor-arg value="queue_text"/>	</bean>
(3)在cn.itcast.demo包下创建消息生产者类
@Componentpublic class QueueProducer {		@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination queueTextDestination;		/**	 * 发送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(queueTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

    (4)单元测试     

            在src/test/java创建测试类

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml")public class TestQueue {	@Autowired	private QueueProducer queueProducer;		@Test	public void testSend(){		queueProducer.sendTextMessage("SpringJms-点对点");	}	}

        消息消费者

(1)创建工程springjms_consumer,在POM文件中引入依赖(同上一个工程)
(2)创建配置文件 applicationContext-jms-consumer-queue.xml

    
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean>	   <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean>	<!--这个是队列目的地,点对点的  文本信息-->	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">	<constructor-arg value="queue_text"/>	</bean>	<!-- 我的监听类 -->	<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>	<!-- 消息监听容器 -->	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">		<property name="connectionFactory" ref="connectionFactory" />		<property name="destination" ref="queueTextDestination" />		<property name="messageListener" ref="myMessageListener" />	</bean>	

(3)编写监听类    

public class MyMessageListener implements MessageListener {	public void onMessage(Message message) {	TextMessage textMessage=(TextMessage)message;				try {			System.out.println("接收到消息:"+textMessage.getText());		} catch (JMSException e) {			e.printStackTrace();		}	}}

(4)创建测试类

        
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xml")public class TestQueue {	@Test	public void testQueue(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}	}

6.2、发布/订阅模式

        消息生产者    

(1)在工程springjms_producer的applicationContext-jms-producer.xml增加配置

    
	<!--这个是订阅模式  文本信息-->	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">	<constructor-arg value="topic_text"/>	</bean>

(2)创建生产者类

    
@Componentpublic class TopicProducer {	@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination topicTextDestination;		/**	 * 发送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(topicTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

(3)编写测试类

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import cn.itcast.demo.TopicProducer;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml")public class TestTopic {	@Autowired	private TopicProducer topicProducer;	@Test	public void sendTextQueue(){				topicProducer.sendTextMessage();	}	}

    消息消费者        

(1)在activemq-spring-consumer工程中创建配置文件applicationContext-jms-consumer-topic.xml

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">	<property name="brokerURL" value="tcp://192.168.25.135:61616"/>	</bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->	<property name="targetConnectionFactory" ref="targetConnectionFactory"/>	</bean><!--这个是队列目的地,点对点的  文本信息-->	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">	<constructor-arg value="topic_text"/>	</bean>	<!-- 我的监听类 -->	<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>	<!-- 消息监听容器 -->	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">		<property name="connectionFactory" ref="connectionFactory" />		<property name="destination" ref="topicTextDestination" />		<property name="messageListener" ref="myMessageListener" />	</bean>

(2)编写测试类    

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xml")public class TestTopic {	@Test	public void testTopic(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}		}

    测试:同时运行三个消费者工程,在运行生产者工程,查看三个消费者工程的控制台输出。