Contact Us

Address: 1st Floor,Building 4, 1088th, Huyi highway, Shanghai
TEL:021-31080981
Email:soline@soline.com.cn
P.C.:201802

Message middleware solution JMS

1. What is message middleware

Messaging middleware uses efficient and reliable message delivery mechanisms for platform-independent data exchange, and integrates distributed systems based on data communication. By providing message passing and message queuing models, it can expand the communication between processes in a distributed environment. For message middleware, the common roles are roughly Producer (producer), Consumer (consumer)

    Common messaging middleware products

     (1) ActiveMQ

        ActiveMQ is the most popular and powerful open source message bus produced by Apache. ActiveMQ is a JMS Provider implementation that fully supports JMS1.1 and J2EE 1.4 specifications. We introduce the use of ActiveMQ in this course.

    (2) RabbitMQ

        The leading realization of AMQP protocol supports multiple scenarios. Taobao's MySQL cluster uses it for communication internally. The communication component of the OpenStack open source cloud platform was first used in the financial industry.

    (3) ZeroMQ

        The fastest message queuing system in history

    (4) Kafka

        A sub-project under Apache. Features: High throughput, a throughput rate of 10W/s can be reached on an ordinary server; a completely distributed system. Suitable for processing massive data.

2. Introduction to JMS

2.1, what is JMS

        JMS (Java Messaging Service) is a technical specification for message-oriented middleware on the Java platform. It is convenient for Java applications in the messaging system to exchange messages, and it simplifies the development of enterprise applications by providing standard interfaces for generating, sending, and receiving messages .

       JMS itself only defines a series of interface specifications, which is a vendor-independent API for accessing messaging systems. It is similar to JDBC (java Database Connectivity): here, JDBC is an API that can be used to access many different relational databases, while JMS provides the same access method independent of vendors to access messaging services. Many vendors currently support JMS, including IBM's MQSeries, BEA's Weblogic JMS service, and Progress's SonicMQ. These are just a few examples. JMS enables you to send messages from one JMS client to another JML client through a messaging service (sometimes called a message broker or router). A message is a type of object in JMS, which consists of two parts: a header and a message body. The header consists of routing information and metadata about the message. The message body carries the data or payload of the application.

    JMS defines five different message body formats, as well as the message types called, allowing you to send and receive data in different forms, providing

Some levels of compatibility with existing message formats.

· TextMessage--a string object

· MapMessage--a set of name-value pairs

· ObjectMessage--a serialized Java object

· BytesMessage--a byte data stream

StreamMessage - data stream of Java primitive values

2.2, JMS messaging type

    There are two types of message delivery

One is point-to-point, that is, one producer and one consumer correspond one-to-one.    


           The other is the publish/subscribe model, that is, after a producer generates a message and sends it, it can be received by multiple consumers.
     


3. ActiveMQ download and installation

3.1, official website download

             Download from the official website: http://activemq.apache.org/

     3.2. Installation (Linux)

     (1) Upload apache-activemq-5.12.0-bin.tar.gz to the server

     (2) Unzip this file

tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
    (3)为apache-activemq-5.12.0Directory empowerment
chmod 777 apache-activemq-5.12.0

    (4)Enter the apache-activemq-5.12.0\bin directory

    (5)Grant execution permissions       

    (6)start up

 ./activemq start

The following prompt appears to indicate success!     

Assuming the server address is 192.168.25.135, open the browser and enter the address            http://192.168.25.135:8161/ You can enter the ActiveMQ management page
     

Click to enter the management page     

Enter the user name and password are both admin    

Enter the main interface        

Point-to-point message queue     

The meaning of each column in the list

     Number Of Pending Messages: Messages waiting to be consumed This is the number of messages currently not out of the queue.

     Number Of Consumers: Consumers This is the number of consumers on the consumer side

     Messages Enqueued: The total number of messages entering the queue, including those out of the queue.

     Messages Dequeued: Messages out of the queue can be understood as the amount consumed.

4, JMS small Demo   

4.1, point-to-point mode

     The point-to-point mode is mainly based on a queue. When connecting to a queue, the sender does not need to know whether the receiver is receiving or not. It can directly send a message to ActiveMQ. The sent message will enter the queue first. If there is a receiver When listening, it will be sent to the receiving end. If there is no receiving end, it will be stored in the activemq server until the receiving end receives the message. The point-to-point message mode can have multiple senders and multiple receivers, but one message is only It will be received by a receiving end, and which receiving end is connected to ActiveMQ first will receive it first, and the subsequent receiving end will not receive the message.

     4.2, message producer

     (1) Create the project jmsDemo and introduce dependencies

   org.apache.activemqactivemq-client5.13.4

    (2) Create the main method code of the class QueueProducer as follows:

	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);			//5.创建队列对象	Queue queue = session.createQueue("test-queue");	//6.创建消息生产者	MessageProducer producer = session.createProducer(queue);	//7.创建消息	TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");	//8.发送消息	producer.send(textMessage);	//9.关闭资源	producer.close();	session.close();	connection.close();
    

Two parameters of session creation in step 4 of the above code:

         The first parameter is whether to use transaction

         The second parameter is the confirmation mode of the message

         • AUTO_ACKNOWLEDGE = 1 automatic confirmation

         • CLIENT_ACKNOWLEDGE = 2 The client confirms manually

         • DUPS_OK_ACKNOWLEDGE = 3 automatic batch confirmation

         • SESSION_TRANSACTED = 0 transaction is submitted and confirmed

         Query through the ActiveMQ management interface after running


    4.3、消息消费者  


        创建类QueueConsumer ,main方法代码如下:
	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建队列对象	Queue queue = session.createQueue("test-queue");	//6.创建消息消费	MessageConsumer consumer = session.createConsumer(queue);		//7.监听消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});		//8.等待键盘输入	System.in.read();		//9.关闭资源	consumer.close();	session.close();	connection.close();	
See the console output after execution         

Run test

         Open two or more consumers at the same time, run the producer again, observe the output of each consumer's console, you will find that only one consumer will receive the message.

    5. Publish/subscribe model

5.1, message producer

         Create a class TopicProducer, the main method code is as follows:

        //1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建主题对象	Topic topic = session.createTopic("test-topic");	//6.创建消息生产者	MessageProducer producer = session.createProducer(topic);	//7.创建消息	TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");	//8.发送消息	producer.send(textMessage);	//9.关闭资源	producer.close();	session.close();	connection.close();

    running result   

    5.2, message consumers    

    Create a class TopicConsumer, the main method code is as follows:
	//1.创建连接工厂	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");	//2.获取连接	Connection connection = connectionFactory.createConnection();	//3.启动连接	connection.start();	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	//5.创建主题对象	//Queue queue = session.createQueue("test-queue");	Topic topic = session.createTopic("test-topic");	//6.创建消息消费	MessageConsumer consumer = session.createConsumer(topic);		//7.监听消息	consumer.setMessageListener(new MessageListener() {		public void onMessage(Message message) {			TextMessage textMessage=(TextMessage)message;			try {				System.out.println("接收到消息:"+textMessage.getText());			} catch (JMSException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}		}	});	//8.等待键盘输入	System.in.read();	//9.关闭资源	consumer.close();	session.close();	connection.close();	

5.3, run the test        

            Open more than 2 consumers at the same time, run the producer again, observe the output of each consumer's console, you will find that each consumer will receive the message.

6. Spring integrates JMS

6.1, point-to-point mode

           Message producer

     (1) Create the project springjms_producer, and introduce SpringJms, activeMQ and unit test related dependencies in the POM file

     (2) Create the spring configuration file applicationContext-jms-producer.xml under src/main/resources


       (3) Create a message producer class under the cn.itcast.demo package
@Componentpublic class QueueProducer {		@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination queueTextDestination;		/**	 * 发送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(queueTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

     (4) Unit testing     

            Create a test class in src/test/java

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml")public class TestQueue {	@Autowired	private QueueProducer queueProducer;		@Test	public void testSend(){		queueProducer.sendTextMessage("SpringJms-点对点");	}	}

        Message consumer

(1) Create the springjms_consumer project and introduce dependencies in the POM file (same as the previous project)

(2) Create the configuration file applicationContext-jms-consumer-queue.xml

    
	

(3) Write monitoring class    

public class MyMessageListener implements MessageListener {	public void onMessage(Message message) {	TextMessage textMessage=(TextMessage)message;				try {			System.out.println("接收到消息:"+textMessage.getText());		} catch (JMSException e) {			e.printStackTrace();		}	}}

(4) Create a test class

        
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-queue.xml")public class TestQueue {	@Test	public void testQueue(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}	}

6.2, publish/subscribe model

        Message producer    

(1) Add configuration in applicationContext-jms-producer.xml of project springjms_producer

    

(2) Create a producer class

    
@Componentpublic class TopicProducer {	@Autowired	private JmsTemplate jmsTemplate;		@Autowired	private Destination topicTextDestination;		/**	 * 发送文本消息	 * @param text	 */	public void sendTextMessage(final String text){		jmsTemplate.send(topicTextDestination, new MessageCreator() {						public Message createMessage(Session session) throws JMSException {				return session.createTextMessage(text);			}		});			}}

(3) Write test class

import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import cn.itcast.demo.TopicProducer;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-activemq-producer.xml")public class TestTopic {	@Autowired	private TopicProducer topicProducer;	@Test	public void sendTextQueue(){				topicProducer.sendTextMessage();	}	}

   Message consumer        

(1) Create the configuration file applicationContext-jms-consumer-topic.xml in the activemq-spring-consumer project


(2) Write test class    

@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations="classpath:applicationContext-jms-consumer-topic.xml")public class TestTopic {	@Test	public void testTopic(){		try {			System.in.read();		} catch (IOException e) {			e.printStackTrace();		}			}		}

    Test: Run three consumer projects at the same time, run the producer project, and check the console output of the three consumer projects.