ActiveMQ 의 spring 통합 큐

9389 단어 activemq
Spring 을 통 해 ActiveMQ 를 설정 개발 하여 대기 열 메시지 모드 를 실현 하고 메시지 의 지속 화 를 지원 합 니 다.
pom. xml 에 jar 패 키 지 를 추가 합 니 다.
<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.2.4</version>
		</dependency>

1. 생산자
spring 설정
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:lang="http://www.springframework.org/schema/lang"
	xsi:schemaLocation="
			http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
			http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
			http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
			http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
			http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
			http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
			http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">

	 <!--   JMS     -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session     -->  
        <property name="sessionCacheSize" value="10" />  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ   -->  
                <property name="brokerURL" value="tcp://192.168.18.43:61616" />  
                 <!--        -->  
                <property name="useAsyncSend" value="true" />  
            </bean>  
        </property>  
    </bean> 

	<!--      -->
	<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<property name="physicalName" value="spring.queue" />
	</bean>

	<!-- jmsTemplate,            -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="testQueue" />
		<property name="sessionTransacted" value="false" />
		<!-- receiveTimeout             -->
		<property name="receiveTimeout" value="30000" />
	</bean>
</beans>
메시지 보 내기:
package cn.slimsmart.activemq.demo.spring.queue;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

@SuppressWarnings("resource")
public class ProducerMain {

	public static void main(String[] args) {
		ApplicationContext context = new ClassPathXmlApplicationContext("classpath:queue/producer.xml");
	    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
	    jmsTemplate.convertAndSend("         ");
	}
}

2. 소비자
spring 설정:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms"
	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:lang="http://www.springframework.org/schema/lang"
	xsi:schemaLocation="
			http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
			http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
			http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
			http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
			http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
			http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
			http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd">

	 <!--   JMS     -->  
    <bean id="connectionFactory"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session     -->  
        <property name="sessionCacheSize" value="10" />  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ   -->  
                <property name="brokerURL" value="tcp://192.168.18.43:61616" />  
            </bean>  
        </property>  
    </bean> 
	
	<!--      -->
	<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
		<property name="physicalName" value="spring.queue" />
	</bean>
	
    <!--     Queue  Container -->  
    <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="testQueue" />  
        <property name="messageListener" ref="notifyMessageListener" />  
        <!--   5 Consumer,       10 -->  
        <property name="concurrentConsumers" value="5" />  
        <property name="maxConcurrentConsumers" value="10" />  
        <property name="sessionTransacted" value="false" />
        <!--          Client -->  
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />  
    </bean>  
    
    <bean id="notifyMessageListener" class="cn.slimsmart.activemq.demo.spring.NotifyMessageListener"></bean>
</beans>
수신 정보 감청:
package cn.slimsmart.activemq.demo.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import com.google.gson.Gson;

public class NotifyMessageListener implements MessageListener {

	public void onMessage(Message message) {
		System.out.println(new Gson().toJson(message));
		if (message instanceof TextMessage) {
		    try {
				System.out.println("      :" + ((TextMessage)message).getText());
				System.out.println("  :" + ((TextMessage)message).getStringProperty("property"));
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}
소비자 작 동 유형:
package cn.slimsmart.activemq.demo.spring.queue;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerMain {
	@SuppressWarnings("resource")
	public static void main(String[] args) {
	      new ClassPathXmlApplicationContext("classpath:queue/consumer.xml");
	}
}

좋은 웹페이지 즐겨찾기