从源码中理解spring cloud kafka stream 是如何分配kafka的partitions给不同的instance的

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

首先,kafka的topic是由多个partitions物理分隔的。假设topic: testIn,有8个partitions

其次,我们编写的springcloud kafka stream程序,打成jar包后,可以部署多个不同的实例instances,假设部署了3个instance。

那么这3个instance是怎么分配这8个partitions的呢?

在spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled=true时(默认),多个实例会自动均衡

的分配partitions,由ConsumerCoordinator来自动协调,不用您操心了(跟instanceCount 和instanceIndex 没关系了,但是concurrency还是同理的), 当设置成false后:

主要是通过consumer的4个参数(org.springframework.cloud.stream.binder.ConsumerProperties)来决定的:

spring.cloud.stream.bindings.input.consumer.partitioned=true

准备启动多少个实例

spring.cloud.stream.bindings.input.consumer.instanceCount =3

该实例编号index,从0开始到instanceCount -1

spring.cloud.stream.bindings.input.consumer.instanceIndex =0

每个实例中启动多少个kafka consumer

spring.cloud.stream.bindings.input.consumer.concurrency =1

如果按照上面的配置的话(3个instance,每个instance的并行度1):

instance0将启动一个consumer0,消费p0,p3,p6;

instance1将启动一个consumer0,消费p1,p4,p7;

instance2将启动一个consumer0,消费p2,p5;

主要逻辑在org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder#createConsumerEndpoint这个方法中:

    protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
                final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {

            boolean anonymous = !StringUtils.hasText(group);
            Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
                    "DLQ support is not available for anonymous subscriptions");
            String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
            final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
                    extendedConsumerProperties);
            int partitionCount = extendedConsumerProperties.getInstanceCount()
                    * extendedConsumerProperties.getConcurrency();
            //如果AutoRebalanceEnabled=false的话,当instanceCount*Concurrency比实际topic的partitions数量还要多的话,将报错,因为会启动空闲的consumer
            Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
                    extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
                    new Callable<Collection<PartitionInfo>>() {

                        @Override
                        public Collection<PartitionInfo> call() throws Exception {
                            Consumer<?, ?> consumer = consumerFactory.createConsumer();
                            List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
                            consumer.close();
                            return partitionsFor;
                        }

                    });

            Collection<PartitionInfo> listenedPartitions;

            if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
                    extendedConsumerProperties.getInstanceCount() == 1) {
                listenedPartitions = allPartitions;
            }
            else {
                listenedPartitions = new ArrayList<>();
                //为该Instance计算应该分派哪些partitions
                for (PartitionInfo partition : allPartitions) {
                    // divide partitions across modules
                    if ((partition.partition()
                            % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
                                    .getInstanceIndex()) {
                        listenedPartitions.add(partition);
                    }
                }
            }
            this.topicsInUse.put(destination.getName(), new TopicInformation(group, listenedPartitions));

            Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
            final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
                    listenedPartitions);
            final ContainerProperties containerProperties = anonymous
                    || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
                            ? new ContainerProperties(destination.getName())
                            : new ContainerProperties(topicPartitionInitialOffsets);
            //并行度取consumer参数中设置的Concurrency和实际处理的partitions子集中较小的一个。
            int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
            @SuppressWarnings("rawtypes")
            //ConcurrentMessageListenerContainer的doStart方法将根据并行度创建consumer
            final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
                    new ConcurrentMessageListenerContainer(
                            consumerFactory, containerProperties) {

                @Override
                public void stop(Runnable callback) {
                    super.stop(callback);
                }

            };
            messageListenerContainer.setConcurrency(concurrency);
            if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
                messageListenerContainer.getContainerProperties()
                        .setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
                messageListenerContainer.getContainerProperties().setAckOnError(false);
            }
            else {
                messageListenerContainer.getContainerProperties()
                        .setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(
                        "Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
            }
            final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
                    messageListenerContainer);
            kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
            ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
                    extendedConsumerProperties);
            if (extendedConsumerProperties.getMaxAttempts() > 1) {
                kafkaMessageDrivenChannelAdapter.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
                kafkaMessageDrivenChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
            }
            else {
                kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
            }
            return kafkaMessageDrivenChannelAdapter;
        }

org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart方法:

    @Override
        protected void doStart() {
            if (!isRunning()) {
                ContainerProperties containerProperties = getContainerProperties();
                TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
                if (topicPartitions != null
                        && this.concurrency > topicPartitions.length) {
                    this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
                            + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                            + topicPartitions.length);
                    this.concurrency = topicPartitions.length;
                }
                setRunning(true);
                            //根据并行度生成N个KafkaMessageListenerContainer,
                            //每个KafkaMessageListenerContainer的构造函数中会初始化一个KafkaConsumer
                for (int i = 0; i < this.concurrency; i++) {
                    KafkaMessageListenerContainer<K, V> container;
                    if (topicPartitions == null) {
                        container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
                    }
                    else {
                    //从前面分给Instance的partitions子集中,再指派一些partitions给具体的并行consumer
                        container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
                                partitionSubset(containerProperties, i));
                    }
                    if (getBeanName() != null) {
                        container.setBeanName(getBeanName() + "-" + i);
                    }
                    if (getApplicationEventPublisher() != null) {
                        container.setApplicationEventPublisher(getApplicationEventPublisher());
                    }
                    container.setClientIdSuffix("-" + i);
                    container.start();
                    this.containers.add(container);
                }
            }
        }

      //具体的二级子集生成策略
        private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
            TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (this.concurrency == 1) {
                return topicPartitions;
            }
            else {
                int numPartitions = topicPartitions.length;
                if (numPartitions == this.concurrency) {
                    return new TopicPartitionInitialOffset[] { topicPartitions[i] };
                }
                else {
                    int perContainer = numPartitions / this.concurrency;
                    TopicPartitionInitialOffset[] subset;
                    if (i == this.concurrency - 1) {
                        subset = Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length);
                    }
                    else {
                        subset = Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
                    }
                    return subset;
                }
            }
        }

看完以上核心代码,应该就很清楚partitions的分配策略了,下面我们改一下场景,启动2个instance,每个instance的concurrency=3,那么:

instance0将分到[p0,p2,p4,p6]这4个分区,然后会启动3个consumer,consumer0将消费[p0],consumer1消费[p2],consumer2消费[p4,p6]。

instance0将分到[p1,p3,p5,p7]这4个分区,然后会启动3个consumer,consumer0将消费[p1],consumer1消费[p3],consumer2消费[p5,p7]。

可以看到每个instance的consumer2其实会多消费一个分区,不是很均匀,所以应该尽可能能整除,比如启动2个instance,concurrency=4,或者concurrency=2。当concurrency=2时,每个实例的每个consumer将消费两个partition。

看到这里,你应该对这几个参数有了一个比较深刻的理解了,这样在stream程序扩容或者缩容的时候应该能正确的配置参数.


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 从源码中理解spring cloud kafka stream 是如何分配kafka的partitions给不同的instance的

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏