博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ基于Spring AMQP的Java Config 简单配置实例
阅读量:7103 次
发布时间:2019-06-28

本文共 13805 字,大约阅读时间需要 46 分钟。

hot3.png

      本实例尝试针对 Direct exchange、Fanout exchange、Topic exchange三种路由形式进行了消费者和生产者的集中实现,如有不对之处烦请读者指出。

1、项目基于Maven,在pom.xml中引入相关依赖。

com.rabbitmq
amqp-client
${rabbitmq.version}
org.springframework.amqp
spring-rabbit
${spring-rabbit.version}

2、RabbitMQ的Java Config 部分

package com.my.rabbitmq.appconfig;import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import org.springframework.amqp.core.AbstractExchange;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Binding.DestinationType;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.support.converter.AbstractMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import com.my.rabbitmq.util.FastJsonMessageConverter;import com.my.rabbitmq.consumer.RabbitMQConsumer;import com.my.rabbitmq.producer.RabbitMQProducer;/** *  * @description rabbitMQ config * * @author yuanzi * @time 2017年8月23日 下午2:46:01 */@Configurationpublic class RabbitmqConfig {	private static Logger log = LogManager.getLogger(RabbitmqConfig.class);	//读取rabbitmq配置文件	private static InputStream configFile = RabbitmqConfig.class.getClassLoader()			.getResourceAsStream("rabbitmq.properties");	private static Properties rabbitmqProperties = new Properties();	static {		try {			rabbitmqProperties.load(configFile);		} catch (IOException e) {			e.printStackTrace();		}	}	//配置生产者ConnectionFactory,配置基础连接信息	@Bean(name = "ProducerConnectionFactory")	public ConnectionFactory ProducerConnectionFactory() throws IOException {		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(				rabbitmqProperties.getProperty("rabbitmq.host"));		connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port")));		connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username"));		connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password"));		return connectionFactory;	}	//配置消费者ConnectionFactory,配置基础连接信息	@Bean(name = "ConsumerConnectionFactory")	public ConnectionFactory ConsumerConnectionFactory() throws IOException {		CachingConnectionFactory connectionFactory = new CachingConnectionFactory(				rabbitmqProperties.getProperty("rabbitmq.host"));		connectionFactory.setPort(Integer.parseInt(rabbitmqProperties.getProperty("rabbitmq.port")));		connectionFactory.setUsername(rabbitmqProperties.getProperty("rabbitmq.username"));		connectionFactory.setPassword(rabbitmqProperties.getProperty("rabbitmq.password"));		return connectionFactory;	}	//配置RabbitAdmin,在配置了多个ConnectionFactory的情况下,需要配置RabbitAdmin,否则无法自动在rabbitmq服务器中注册交换机队列等。	@Bean	public RabbitAdmin rabbitAdmin() throws IOException {		return new RabbitAdmin(ConsumerConnectionFactory());	}	//配置生产者Template	@Bean(name = "ProducerRabbitTemplate")	public RabbitTemplate ProducerRabbitTemplate() throws IOException {		RabbitTemplate producerRabbitTemplate = new RabbitTemplate();		producerRabbitTemplate.setConnectionFactory(ProducerConnectionFactory());		producerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange"));		//配置生产者信息转换器,封装发送信息的格式		producerRabbitTemplate.setMessageConverter(ProducerMessageConverter());		return producerRabbitTemplate;	}	//配置消费者Template	@Bean(name = "ConsumerRabbitTemplate")	public RabbitTemplate ConsumerRabbitTemplate() throws IOException {		RabbitTemplate consumerRabbitTemplate = new RabbitTemplate(ConsumerConnectionFactory());		consumerRabbitTemplate.setExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange"));		return consumerRabbitTemplate;	}	//配置生产者交换机类型	@Bean(name = "ProducerExchange")	public AbstractExchange ProducerExchange() throws IOException {		String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType");		if (exchangeType.contains("F")) {			FanoutExchange fanoutExchange = new FanoutExchange(					rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false);			log.info("ProducerExchange-Fanout 生产者:广播路由");			return fanoutExchange;		} else if (exchangeType.contains("T")) {			TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.producerExchange"),					true, false);			log.info("ProducerExchange-Topic 生产者:多路广播路由");			return topicExchange;		} else {			DirectExchange directExchange = new DirectExchange(					rabbitmqProperties.getProperty("rabbitmq.producerExchange"), true, false);			log.info("ProducerExchange-Direct 生产者:直接路由");			return directExchange;		}	}	//配置消费者交换机类型	@Bean(name = "ConsumerExchange")	public AbstractExchange ConsumerExchange() throws IOException {		String exchangeType = rabbitmqProperties.getProperty("rabbitmq.exchangeType");		if (exchangeType.contains("F")) {			FanoutExchange fanoutExchange = new FanoutExchange(					rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false);			log.info("ProducerExchange-Fanout 消费者:广播路由");			return fanoutExchange;		} else if (exchangeType.contains("T")) {			TopicExchange topicExchange = new TopicExchange(rabbitmqProperties.getProperty("rabbitmq.consumerExchange"),					true, false);			log.info("ProducerExchange-Topic 消费者:多路广播路由");			return topicExchange;		} else {			DirectExchange directExchange = new DirectExchange(					rabbitmqProperties.getProperty("rabbitmq.consumerExchange"), true, false);			log.info("ProducerExchange-Direct 消费者:直接路由");			return directExchange;		}	}	//配置参数:生产者队列名称,重启rabbitmq服务时队列还是否存在,只被一个connection使用并且在connection关闭时queue是否被删除,当最后一个consumer取消订阅时queue是否被删除	@Bean(name = "ProducerQueue")	public Queue ProducerQueue() throws IOException {		Queue producerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), true, false, false);		return producerQueue;	}	//配置参数:消费者队列名称,重启rabbitmq服务时队列还是否存在,只被一个connection使用并且在connection关闭时queue是否被删除,当最后一个consumer取消订阅时queue是否被删除	@Bean(name = "ConsumerQueue")	public Queue ConsumerQueue() throws IOException {		Queue consumerQueue = new Queue(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), true, false, false);		return consumerQueue;	}	//配置生产者消息转换器	@Bean(name = "jsonMessageConverter")	public AbstractMessageConverter ProducerMessageConverter() {		FastJsonMessageConverter jsonMessageConverter = new FastJsonMessageConverter();		return jsonMessageConverter;	}	//配置生产者队列与交换机之间的绑定关系和routingKey	@Bean(name = "ProducerBinding")	public Binding ProducerBinding() {		Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.producerQueue"), DestinationType.QUEUE,				rabbitmqProperties.getProperty("rabbitmq.producerExchange"),				rabbitmqProperties.getProperty("rabbitmq.producerRoutingKey"), null);		return binding;	}	//配置消费者队列与交换机之间的绑定关系和routingKey	@Bean(name = "ConsumerBinding")	public Binding ConsumerBinding() {		Binding binding = new Binding(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"), DestinationType.QUEUE,				rabbitmqProperties.getProperty("rabbitmq.consumerExchange"),				rabbitmqProperties.getProperty("rabbitmq.consumerRoutingKey"), null);		return binding;	}	//实例化生产者代码,配置生产者要投递信息的RoutingKey	@Bean(name = "RabbitMQProducer")	public RabbitMQProducer RabbitMQProducer() {		RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(				rabbitmqProperties.getProperty("rabbitmq.producerSendRoutingKey"));		return rabbitMQProducer;	}	//实例化消费者代码	@Bean(name = "RabbitMQConsumer")	public RabbitMQConsumer RabbitMQConsumer() {		RabbitMQConsumer rabbitMQConsumer = new RabbitMQConsumer();		return rabbitMQConsumer;	}	//配置监听模式,当有消息到达时会通知监听在对应的队列上的监听对象	@Bean(name = "ListenerContainer")	public AbstractMessageListenerContainer MessageListenerContainer() throws IOException {		SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();		simpleMessageListenerContainer.setConnectionFactory(ConsumerConnectionFactory());		simpleMessageListenerContainer.setQueueNames(rabbitmqProperties.getProperty("rabbitmq.consumerQueue"));		simpleMessageListenerContainer.setMessageListener(RabbitMQConsumer());		return simpleMessageListenerContainer;	}}

3、rabbitmq.properties 文件

#单一交换机配置参数#rabbitmq server地址rabbitmq.host = 127.0.0.1#rabbitmq 端口rabbitmq.port = 5672#生产者队列名称rabbitmq.producerQueue = d_queue_producer#生产者RoutingKeyrabbitmq.producerRoutingKey = d_key_producer#消费者队列名称rabbitmq.consumerQueue = d_queue_consumer#消费者RoutingKeyrabbitmq.consumerRoutingKey = d_key_consumer#交换机类型 D为单一传播交换机类型,F为广播交换机类型,T为多路广播交换机类型rabbitmq.exchangeType = D#生产者需要绑定的交换机rabbitmq.producerExchange = d_exchange#消费者需要绑定的交换机rabbitmq.consumerExchange = d_exchange#rabbitmq用户名称rabbitmq.username = guest#rabbitmq用户密码rabbitmq.password = guest#rabbitmq生产者发送消息时的RoutingKeyrabbitmq.producerSendRoutingKey = d_key_consumer---------------------------------------------分割线----------------------------------------------#广播交换机配置参数,广播交换机下routingKey失效#rabbitmq server地址rabbitmq.host = 127.0.0.1#rabbitmq 端口rabbitmq.port = 5672#生产者队列名称rabbitmq.producerQueue = f_queue_producer#生产者RoutingKeyrabbitmq.producerRoutingKey = f_key_producer#消费者队列名称rabbitmq.consumerQueue = f_queue_consumer#消费者RoutingKeyrabbitmq.consumerRoutingKey = f_key_consumer#交换机类型 D为单一传播交换机类型,F为广播交换机类型,T为多路广播交换机类型rabbitmq.exchangeType = F#生产者需要绑定的交换机rabbitmq.producerExchange = f_exchange#消费者需要绑定的交换机rabbitmq.consumerExchange = f_exchange#rabbitmq用户名称rabbitmq.username = guest#rabbitmq用户密码rabbitmq.password = guest#rabbitmq生产者发送消息时的RoutingKeyrabbitmq.producerSendRoutingKey = f_test---------------------------------------------分割线----------------------------------------------#多路广播交换机配置参数#配置rootingkey时,“#”表示0个或若干个关键字,“*”表示一个关键字。例如“log.*”能与“log.warn”匹配,无法#与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配#rabbitmq server地址rabbitmq.host = 127.0.0.1#rabbitmq 端口rabbitmq.port = 5672#生产者队列名称rabbitmq.producerQueue = t_queue_producer#生产者RoutingKeyrabbitmq.producerRoutingKey = t_key_producer#消费者队列名称rabbitmq.consumerQueue = t_queue_consumer#消费者RoutingKeyrabbitmq.consumerRoutingKey = #.test#交换机类型 D为单一传播交换机类型,F为广播交换机类型,T为多路广播交换机类型rabbitmq.exchangeType = T#生产者需要绑定的交换机rabbitmq.producerExchange = t_exchange2#消费者需要绑定的交换机rabbitmq.consumerExchange = t_exchange2#rabbitmq用户名称rabbitmq.username = guest#rabbitmq用户密码rabbitmq.password = guest#rabbitmq生产者发送消息时的RoutingKeyrabbitmq.producerSendRoutingKey = t.test

4、生产者代码

package com.my.rabbitmq.producer;import javax.annotation.Resource;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import org.springframework.amqp.rabbit.core.RabbitTemplate;/** * @description RabbitMQ消息发送 * * @author yuanzi * @time 2017年8月1日 下午3:16:08 */public class RabbitMQProducer {	private static Logger log = LogManager.getLogger(RabbitMQProducer.class);	@Resource(name = "ProducerRabbitTemplate")	private RabbitTemplate rabbitTemplate;	private String routingKey;	public RabbitMQProducer(String routingKey) {		this.routingKey = routingKey;	}	public void sendDataToQueue(Object message) {		rabbitTemplate.convertAndSend(this.routingKey, message);		log.info("rabbitMQ producer send message:" + message);	}}

5、消费者代码

package com.my.rabbitmq.consumer;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;/** *  * @description RabbitMQ消息接收 * * @author yuanzi * @time 2017年8月23日 下午3:38:03 */public class RabbitMQConsumer implements MessageListener {	private static Logger log = LogManager.getLogger(RabbitMQConsumer.class);	public void onMessage(Message message) {		String data = new String(message.getBody());		log.info("rabbitMQ consumer receive message:" + data);	}}

6、消息转换器的配置 

package com.my.rabbitmq.util;import java.io.UnsupportedEncodingException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.AbstractMessageConverter;import org.springframework.amqp.support.converter.MessageConversionException;import net.sf.json.JSONObject;/** *  * @description 对发送的信息进行json格式的封装 * * @author yuanzi * @time 2017年8月1日 下午3:13:30 */public class FastJsonMessageConverter extends AbstractMessageConverter {    public static final String DEFAULT_CHARSET = "UTF-8";    @SuppressWarnings("static-access")	@Override    protected Message createMessage(Object message, MessageProperties messageProperties) {        byte[] bytes = null;        JSONObject json = new JSONObject();        json.put("data", message);                try {            bytes = json.toString().getBytes(this.DEFAULT_CHARSET);        } catch (UnsupportedEncodingException e) {            throw new MessageConversionException("Failed to convert Message content:" + e);        }        if (bytes != null) {            messageProperties.setContentLength(bytes.length);        }        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);        messageProperties.setContentEncoding(this.DEFAULT_CHARSET);        return new Message(bytes, messageProperties);    }    @Override    public Object fromMessage(Message message) throws MessageConversionException {        return null;    }}

 

转载于:https://my.oschina.net/u/2478308/blog/1519334

你可能感兴趣的文章
康托展开与逆康托展开
查看>>
hadoop 2.7.2 安装
查看>>
JAVA泛型详解——转
查看>>
#ifdef __cplusplus extern "C" { #endif 的解释<转>
查看>>
高速队类实现(线程安全)
查看>>
css3 transition 过渡动画
查看>>
CSS基础入门视频教程荟萃
查看>>
centos7 安装maven
查看>>
FTPClient与commons-pool2
查看>>
C++中的类型转换
查看>>
恰逢其时:从混合架构向一站式企业级Hadoop架构迈进
查看>>
Glossy Button
查看>>
SegmentedControl
查看>>
Contacts Multi Picker
查看>>
[应用模板]HTML5+Phonegap通讯录
查看>>
开源 java CMS - FreeCMS2.6 热词管理
查看>>
from selenium.webdriver.common.keys import Keys
查看>>
mysql 配置
查看>>
AngularJS 中文资料+工具+库+Demo 大搜集
查看>>
碎碎念01
查看>>