spring boot实战(第十四篇)整合RabbitMQ源码分析前言

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

前言

本篇开始讲述Spring Boot如何整合RabbitMQ(实际上Spring就整合了RabbitMQ)。

RabbitAdmin

在上篇中遗留AmqpAdmin没有讲解,现在来看下该部分代码
[html]
view plain
copy

  1. public AmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {
  2. return new RabbitAdmin(connectionFactory);
  3. }

创建RabbitAdmin实例,调用构造方法
[html]
view plain
copy

  1. public RabbitAdmin(ConnectionFactory connectionFactory) {
  2. this.connectionFactory = connectionFactory;
  3. Assert.notNull(connectionFactory, “ConnectionFactory must not be null”);
  4. this.rabbitTemplate = new RabbitTemplate(connectionFactory);
  5. }

创建连接工厂、rabbitTemplate,其中ConnectionFactory采用上一篇中自定义bean
[html]
view plain
copy

  1. public ConnectionFactory connectionFactory() {
  2. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  3. connectionFactory.setAddresses(“127.0.0.1:5672”);
  4. connectionFactory.setUsername(“guest”);
  5. connectionFactory.setPassword(“guest”);
  6. connectionFactory.setPublisherConfirms(true); //必须要设置
  7. return connectionFactory;
  8. }

为CachingConnectionFactory实例,其缓存模式为通道缓存
[html]
view plain
copy

  1. private volatile CacheMode cacheMode = CacheMode.CHANNEL;

接下来看下RabbitAdmin类定义:
[html]
view plain
copy

  1. public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
  2. }

实现接口AmqpAdmin(定义若干RabbitMQ操作父接口),这里需要强调的是InitializingBean,实现该接口则会调用afterPropertiesSet方法
[html]
view plain
copy

  1. public void afterPropertiesSet() {
  2. synchronized (this.lifecycleMonitor) {
  3. if (this.running || !this.autoStartup) {
  4. return;
  5. }
  6. if (this.connectionFactory instanceof CachingConnectionFactory &&
  7. ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
  8. logger.warn(“RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION”);
  9. return;
  10. }
  11. this.connectionFactory.addConnectionListener(new ConnectionListener() {
  12. // Prevent stack overflow…
  13. private final AtomicBoolean initializing = new AtomicBoolean(false);
  14. @Override
  15. public void onCreate(Connection connection) {
  16. if (!initializing.compareAndSet(false, true)) {
  17. // If we are already initializing, we don’t need to do it again…
  18. return;
  19. }
  20. try {
  21. initialize();
  22. }
  23. finally {
  24. initializing.compareAndSet(true, false);
  25. }
  26. }
  27. @Override
  28. public void onClose(Connection connection) {
  29. }
  30. });
  31. this.running = true;
  32. }
  33. }

synchronized
(
this
.
lifecycleMonitor
)加锁保证同一时间只有一个线程访问该代码,随后调用
this
.
connectionFactory
.addConnectionListener添加连接监听,各连接工厂关系:

实际调用为CachingConnectionFactory

[html]
view plain
copy

  1. public void addConnectionListener(ConnectionListener listener) {
  2. super.addConnectionListener(listener);
  3. // If the connection is already alive we assume that the new listener wants to be notified
  4. if (this.connection != null) {
  5. listener.onCreate(this.connection);
  6. }
  7. }

此时connection为null,无法执行到
listener
.onCreate(
this
.
connection
); 往
CompositeConnectionListener
connectionListener中添加监听信息,最终保证在集合中

[html]
view plain
copy

  1. private List<**ConnectionListener**>delegates = new CopyOnWriteArrayList<**ConnectionListener**>();

这里添加的监听代码执行,在后面调用时再来讲解。

至此~~ RabbitAdmin创建完成。

Exchange

接下来继续来看AmqpConfig.java中的代码
[html]
view plain
copy

  1. @Bean
  2. public DirectExchange defaultExchange() {
  3. return new DirectExchange(EXCHANGE);
  4. }

以上代码创建一个交换机,交换机类型为direct

在申明交换机时需要指定交换机名称,默认创建可持久交换机

Queue

[html]
view plain
copy

  1. public Queue queue() {
  2. return new Queue(“spring-boot-queue”, true); //队列持久
  3. }

默认创建可持久队列

Binding

[html]
view plain
copy

  1. @Bean
  2. public Binding binding() {
  3. return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
  4. }

BindingBuilder.bind(queue()) 实现为:

[html]
view plain
copy

  1. public static DestinationConfigurer bind(Queue queue) {
  2. return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);
  3. }

DestinationConfigurer通过name、type区分不同配置信息,其to()方法为重载方法,传递参数为四种交换机,分别返回Xxx
ExchangeRoutingKeyConfigurer,其中with方法返回Bingding实例,因此在Binding信息中存储了
队列、交换机、路由key等相关信息

[html]
view plain
copy

  1. public class Binding extends AbstractDeclarable {
  2. public static enum DestinationType {
  3. QUEUE, EXCHANGE;
  4. }
  5. private final String destination;
  6. private final String exchange;
  7. private final String routingKey;
  8. private final Map<**String, Object>** arguments;
  9. private final DestinationType destinationType;
  10. }

以上信息理解都非常简单,下面来看比较复杂点的
SimpleMessageListenerContainer

SimpleMessageListenerContainer

[html]
view plain
copy

  1. @Bean
  2. public SimpleMessageListenerContainer messageContainer() {
  3. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
  4. container.setQueues(queue());
  5. container.setExposeListenerChannel(true);
  6. container.setMaxConcurrentConsumers(1);
  7. container.setConcurrentConsumers(1);
  8. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
  9. container.setMessageListener(new ChannelAwareMessageListener() {
  10. @Override
  11. public void onMessage(Message message, Channel channel) throws Exception {
  12. byte[] body = message.getBody();
  13. System.out.println(“receive msg : ” + new String(body));
  14. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
  15. }
  16. });
  17. return container;
  18. }

查看其实现的接口,注意SmartLifecycle

接下来设置队列信息,在AbstractMessageListenerContainer
[html]
view plain
copy

  1. private volatile List<**String**>queueNames = new CopyOnWriteArrayList<**String**>();

添加队列信息
AbstractMessageListenerContainer#
exposeListenerChannel设置为true

[html]
view plain
copy

  1. container.setMaxConcurrentConsumers(1);
  2. container.setConcurrentConsumers(1);

设置并发消费者数量,默认情况为1

[html]
view plain
copy

  1. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认

设置消费者成功消费消息后确认模式,分为两种

  • 自动模式,默认模式,在RabbitMQ Broker消息发送到消费者后自动删除
  • 手动模式,消费者客户端显示编码确认消息消费完成,Broker给生产者发送回调,消息删除

接下来设置消费者端消息监听,为private
volatile
Object
messageListener 赋值

到这里消息监听容器也创建完成了,但令人纳闷的时,消费者如何去消费消息呢?从这里完全看不出来。那么接下来看下SmartLifecycle接口

SmartLifecycle

熟悉Spring都应该知道该接口,其定义为:
[html]
view plain
copy

  1. public interface SmartLifecycle extends Lifecycle, Phased {
  2. boolean isAutoStartup();
  3. void stop(Runnable callback);
  4. }

其中的isAutoStartup设置为true时,会自动调用Lifecycle接口中的start方法,既然我们为源码分析,也简单看下这个聪明的声明周期接口是如何实现它的聪明方法的


spring boot实战(第十篇)Spring boot Bean加载源码分析中讲到执行Bean加载时,调用AbstractApplicationContext#refresh(),其中存在一个方法调用finishRefresh()
[html]
view plain
copy

  1. protected void finishRefresh() {
  2. // Initialize lifecycle processor for this context.
  3. initLifecycleProcessor();
  4. // Propagate refresh to lifecycle processor first.
  5. getLifecycleProcessor().onRefresh();
  6. // Publish the final event.
  7. publishEvent(new ContextRefreshedEvent(this));
  8. // Participate in LiveBeansView MBean, if active.
  9. LiveBeansView.registerApplicationContext(this);
  10. }

其中initLifecycleProcessor初始化生命周期处理器,

[html]
view plain
copy

  1. protected void initLifecycleProcessor() {
  2. ConfigurableListableBeanFactory beanFactory = getBeanFactory();
  3. if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {
  4. this.lifecycleProcessor =
  5. beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);
  6. if (logger.isDebugEnabled()) {
  7. logger.debug(“Using LifecycleProcessor [” + this.lifecycleProcessor + “]”);
  8. }
  9. }
  10. else {
  11. DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();
  12. defaultProcessor.setBeanFactory(beanFactory);
  13. this.lifecycleProcessor = defaultProcessor;
  14. beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);
  15. if (logger.isDebugEnabled()) {
  16. logger.debug(“Unable to locate LifecycleProcessor with name ‘” +
  17. LIFECYCLE_PROCESSOR_BEAN_NAME +
  18. “‘: using default [” + this.lifecycleProcessor + “]”);
  19. }
  20. }
  21. }

注册DefaultLifecycleProcessor对应bean

getLifecycleProcessor().onRefresh()调用DefaultLifecycleProcessor中方法onRefresh,调用startBeans(true)

[html]
view plain
copy

  1. private void startBeans(boolean autoStartupOnly) {
  2. Map<**String, Lifecycle>**lifecycleBeans = getLifecycleBeans();
  3. Map<**Integer, LifecycleGroup>phases = new HashMap<**Integer, LifecycleGroup>();
  4. for (Map.Entry<**String, ? extends Lifecycle>** entry : lifecycleBeans.entrySet()) {
  5. Lifecycle bean = entry.getValue();
  6. if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
  7. int phase = getPhase(bean);
  8. LifecycleGroup group = phases.get(phase);
  9. if (group == null) {
  10. group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
  11. phases.put(phase, group);
  12. }
  13. group.add(entry.getKey(), bean);
  14. }
  15. }
  16. if (phases.size() > 0) {
  17. List<**Integer**>keys = new ArrayList<**Integer**>(phases.keySet());
  18. Collections.sort(keys);
  19. for (Integer key : keys) {
  20. phases.get(key).start();
  21. }
  22. }
  23. }

其中

[html]
view plain
copy

  1. Map<**String, Lifecycle>**lifecycleBeans = getLifecycleBeans();

获取所有实现Lifecycle接口bean,执行bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup()判断,如果bean同时也为Phased实例,则加入到LifecycleGroup中,随后phases.get(key).start()调用start方法

接下来要做的事情就很明显:要了解消费者具体如何实现,查看SimpleMessageListenerContainer中的start是如何实现的。

至此~~整合RabbitMQ源码分析准备工作完成,下一篇中正式解读消费者的实现。

本文转自http://blog.csdn.net/liaokailin/article/details/49559951


来源:[]()

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » spring boot实战(第十四篇)整合RabbitMQ源码分析前言

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏