Disruptor简单使用

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


在定位项目中内存泄漏问题时,发现RingBuffer占用内存过大,这个是在使用log4j2时引入的jar,以前只知道必须使用这个Disruptor才可以用异步日志,
但是并不清楚Disruptor的一些实现,也没有通过编码的方式使用过,这次正好研究一下。

Disruptor是一个低延迟(low-latency),高吞吐量(high-throughput)的事件发布订阅框架,用于一个JVM中多个线程之间的消息队列,
作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue。

相关概念

RingBuffer

应用需要传递的消息在Disruptor中称为Event(事件)。
RingBuffer是Event的数组,实现了阻塞队列的语义:
如果RingBuffer满了,则生产者会阻塞等待。
如果RingBuffer空了,则消费者会阻塞等待。

Sequence

在上文中,我提到“每个消费者需要自己维护一个指针”。这里的指针就是一个单调递增长整数(及其基于CAS的加法、获取操作),称为Sequence。
除了每个消费者需要维护一个指针外,RingBuffer自身也要维护一个全局指针(如上一节第2点所提到的),记录最后一条可以被消费的消息。

高性能的体现

  • 无锁,无锁就没有锁竞争。当生产者、消费者线程数很高时,意义重大。所以,
    往大里说,每个消费者维护自己的Sequence,基本没有跨线程共享的状态。
    往小里说,Sequence的加法是CAS实现的。
    当生产者需要判断RingBuffer是否已满时,用CAS比较原先RingBuffer的Event个数,和假定放入新Event后Event的个数。
    如果CAS返回false,说明在判断期间,别的生产者加入了新Event;或者别的消费者拿走了Event。那么当前判断无效,需要重新判断。
  • 对象的复用,JVM运行时,一怕创建大对象,二怕创建很多小对象。这都会导致JVM堆碎片化、对象元数据存储的额外开销大。这是高性能Java应用的噩梦。
    为了解决第二点“很多小对象”,主流开源框架都会自己维护、复用对象池。LMAX Disruptor也不例外。
    生产者不是创建新的Event对象,放入到RingBuffer中。而是从RingBuffer中取出一个已有的Event对象,更新它所指向的业务数据,来代表一个逻辑上的新Event。

简单使用

pom

        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

LogEvent

自定义实体对象,充当“生产者-消费者”模型中的数据。

    import java.util.Date;

    /**
     * 自定义实体对象,充当“生产者-消费者”模型中的数据<br>
     */
    public class LogEvent {
        /**
         * The Log id.
         */
        private long logId;
        /**
         * The Content.
         */
        private String content;
        /**
         * The Date.
         */
        private Date date;

        /**
         * Gets log id.
         *
         * @return the log id
         */
        public long getLogId() {
            return logId;
        }

        /**
         * Sets log id.
         *
         * @param logId the log id
         */
        public void setLogId(long logId) {
            this.logId = logId;
        }

        /**
         * Gets content.
         *
         * @return the content
         */
        public String getContent() {
            return content;
        }

        /**
         * Sets content.
         *
         * @param content the content
         */
        public void setContent(String content) {
            this.content = content;
        }

        /**
         * Gets date.
         *
         * @return the date
         */
        public Date getDate() {
            return date;
        }

        /**
         * Sets date.
         *
         * @param date the date
         */
        public void setDate(Date date) {
            this.date = date;
        }

        @Override
        public String toString() {
            return "logId=" + logId +
                    ", content='" + content + '\'' +
                    ", date=" + date;
        }
    }

LogEventFactory

实现EventFactory的接口,用于生产数据。

    import com.lmax.disruptor.EventFactory;

    /**
     * 事件生成工厂,用来初始化预分配事件对象,即根据RingBuffer大小创建的实体对象
     */
    public class LogEventFactory implements EventFactory<LogEvent> {
        @Override
        public LogEvent newInstance() {
            return new LogEvent();
        }
    }

LogEventProducer

自定义生产者。

    import java.util.Date;

    import com.lmax.disruptor.RingBuffer;

    /**
     * 自定义生产者
     */
    public class LogEventProducer {
        private RingBuffer<LogEvent> ringBuffer;

        public LogEventProducer(RingBuffer<LogEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        public void onData(long logId, String content, Date date) {
            //递增并返回循环缓冲区的下一个序列。
            long seq = ringBuffer.next();
            //获取RingBuffer中给定序列的事件
            LogEvent logEvent = ringBuffer.get(seq);
            logEvent.setLogId(logId);
            logEvent.setContent(content);
            logEvent.setDate(date);
            //发布指定的序列。此操作将此特定消息标记为可读取。
            ringBuffer.publish(seq);
        }
    }

LogEventProducerWithTranslator

将数据存储到自定义对象中并发布,通过在自定义类中新建EventTranslator类实现。

    import java.util.Date;

    import com.lmax.disruptor.EventTranslatorVararg;
    import com.lmax.disruptor.RingBuffer;

    /**
     * 使用translator方式到事件生产者发布事件,通常使用该方法
     */
    public class LogEventProducerWithTranslator {

        /**
         * 实现将另一个数据表示转换为从{@link RingBuffer}声明的事件。
         */
        private EventTranslatorVararg eventTranslatorVararg = (EventTranslatorVararg<LogEvent>) (event, sequence, args) -> {
            event.setLogId((Long) args[0]);
            event.setContent((String) args[1]);
            event.setDate((Date) args[2]);
        };

        private RingBuffer<LogEvent> ringBuffer;

        public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
            this.ringBuffer = ringBuffer;
        }

        public void onData(long logId, String content, Date date) {
            //允许用户提供可变数量的参数
            ringBuffer.publishEvent(eventTranslatorVararg, logId, content, date);
        }
    }

LogEventConsumer

自定义消费者。

    import com.lmax.disruptor.EventHandler;
    import com.lmax.disruptor.RingBuffer;

    /**
     * 自定义消费者
     */
    public class LogEventConsumer implements EventHandler<LogEvent> {

        /**
         * The Name.
         */
        private String name;

        /**
         * Instantiates a new Log event consumer.
         *
         * @param name the name
         */
        public LogEventConsumer(String name) {
            this.name = name;
        }

        /**
         * 当发布者将事件发布到RingBuffer时调用。
         *
         * @param event      published to the {@link RingBuffer}
         * @param sequence   正在处理的事件的sequence
         * @param endOfBatch 标志,指示这是否是来自{@link RingBuffer}的批处理中的最后一个事件
         */
        @Override
        public void onEvent(LogEvent event, long sequence, boolean endOfBatch) {
            System.out.println("LogEventConsumer name:" + name + ",sequence:" + sequence + ",endOfBatch:" + endOfBatch + ",logEvent:" + event.toString());
        }
    }

LogEventMain

启动项,通过单一生产者,多生产者,单一消费者,多消费者的组合,测试了disruptor的功能。


    import java.util.Date;
    import java.util.concurrent.Executors;

    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.EventHandlerGroup;
    import com.lmax.disruptor.dsl.ProducerType;

    /**
     * 启动
     */
    public class LogEventMain {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         * @throws InterruptedException the interrupted exception
         */
        public static void main(String[] args) throws InterruptedException {
            // 单个生产者和消费者的模式
            producer();
            // 使用EventTranslatorVararg的单个生产者和消费者模式
            producerWithTranslator();
            // 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费
            multiConsumer();
            // 一个生产者,多个消费者,有2条支线
            multiConsumers();
            // 多个生产者,多个消费者,有2条消费者支线
            multiProcedureConsumers();

        }

        /**
         * 多个生产者,多个消费者,有2条消费者支线,其中消费者1和消费者3在同一条支线上,
         * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
         * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
         * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
         * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
         * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
         * 生产者只是多生产了数据
         */
        public static void multiProcedureConsumers() throws InterruptedException {
            LogEventFactory logEventFactory = new LogEventFactory();
            //用于生成RingBuffer大小,其大小必须是2的n次方
            int ringBufferSize = 2 << 3;
            //定义Disruptor初始化信息
            Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
            LogEventConsumer consumer1 = new LogEventConsumer("1");
            LogEventConsumer consumer2 = new LogEventConsumer("2");
            LogEventConsumer consumer3 = new LogEventConsumer("3");
            LogEventConsumer consumer4 = new LogEventConsumer("4");
            LogEventConsumer consumer5 = new LogEventConsumer("5");
            //同时执行消费者1和消费者2
            disruptor.handleEventsWith(consumer1, consumer2);
            //消费者1后面执行消费者3
            disruptor.after(consumer1).handleEventsWith(consumer3);
            //消费者后面执行消费者4
            disruptor.after(consumer2).handleEventsWith(consumer4);
            //消费者3和消费者3执行完后执行消费者5
            disruptor.after(consumer3, consumer4).handleEventsWith(consumer5);
            //定义事件的开始
            disruptor.start();
            RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
            //进行事件的发布
            LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
            LogEventProducer logEventProducer2 = new LogEventProducer(ringBuffer);
            LogEventProducer logEventProducer3 = new LogEventProducer(ringBuffer);
            for (int i = 0; i < 10; i++) {
                logEventProducer.onData(i, "1-logEventProducer" + i, new Date());
                logEventProducer2.onData(i, "2-logEventProducer" + i, new Date());
                logEventProducer3.onData(i, "3-logEventProducer" + i, new Date());
            }
            Thread.sleep(1000);
            //关闭Disruptor
            disruptor.shutdown();
        }

        /**
         * 一个生产者,多个消费者,有2条支线,其中消费者1和消费者3在同一条支线上,
         * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
         * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
         * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
         * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
         * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
         */
        public static void multiConsumers() throws InterruptedException {
            LogEventFactory logEventFactory = new LogEventFactory();
            //用于生成RingBuffer大小,其大小必须是2的n次方
            int ringBufferSize = 2 << 3;
            //定义Disruptor初始化信息
            Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            LogEventConsumer consumer1 = new LogEventConsumer("1");
            LogEventConsumer consumer2 = new LogEventConsumer("2");
            LogEventConsumer consumer3 = new LogEventConsumer("3");
            LogEventConsumer consumer4 = new LogEventConsumer("4");
            LogEventConsumer consumer5 = new LogEventConsumer("5");
            //同时执行消费者1和消费者2
            disruptor.handleEventsWith(consumer1, consumer2);
            //消费者1后面执行消费者3
            disruptor.after(consumer1).handleEventsWith(consumer3);
            //消费者后面执行消费者4
            disruptor.after(consumer2).handleEventsWith(consumer4);
            //消费者3和消费者3执行完后执行消费者5
            disruptor.after(consumer3, consumer4).handleEventsWith(consumer5);
            //定义事件的开始
            disruptor.start();

            RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
            //进行事件的发布
            LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
            for (int i = 0; i < 10; i++) {
                logEventProducer.onData(i, "logEventProducer" + i, new Date());
            }
            Thread.sleep(1000);
            //关闭Disruptor
            disruptor.shutdown();
        }

        /**
         * 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费
         * 也即使说当前面2个消费者把所有的RingBuffer占领完成,同时都消费完成后才会有第3个消费者的消费
         * 当发布的事件数量大于RingBuffer的大小的时候,在第3个消费者消费完RingBuffer大小的时候前面2个消费者才能继续消费,序号递增的
         */
        public static void multiConsumer() throws InterruptedException {
            LogEventFactory logEventFactory = new LogEventFactory();
            //用于生成RingBuffer大小,其大小必须是2的n次方
            int ringBufferSize = 2 << 3;
            //定义Disruptor初始化信息
            Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());

            //设置多个消费者
            EventHandlerGroup<LogEvent> eventEventHandlerGroup = disruptor.handleEventsWith(new LogEventConsumer("1"), new LogEventConsumer("2"));
            eventEventHandlerGroup.then(new LogEventConsumer("3"));
            //启动事件的开始
            disruptor.start();
            RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
            //进行事件的发布
            LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
            for (int i = 0; i < 10; i++) {
                producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date());
            }
            Thread.sleep(1000);
            //关闭Disruptor
            disruptor.shutdown();
        }

        /**
         * 使用EventTranslatorVararg的单个生产者和消费者模式
         */
        public static void producerWithTranslator() throws InterruptedException {
            LogEventFactory logEventFactory = new LogEventFactory();
            //用于生成RingBuffer大小,其大小必须是2的n次方
            int ringBufferSize = 2 << 3;
            //定义Disruptor初始化信息
            Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            //定义处理事件的消费者
            disruptor.handleEventsWith(new LogEventConsumer("1"));
            //定义事件的开始
            disruptor.start();

            RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
            //进行事件的发布
            LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
            for (int i = 0; i < 10; i++) {
                producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date());
            }
            Thread.sleep(1000);
            //关闭Disruptor
            disruptor.shutdown();
        }

        /**
         * 单个生产者和消费者的模式
         */
        public static void producer() throws InterruptedException {
            // 事件生成工厂
            LogEventFactory logEventFactory = new LogEventFactory();
            //用于生成RingBuffer大小,其大小必须是2的n次方
            int ringBufferSize = 2 << 3;
            //定义Disruptor初始化信息
            Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
            //定义处理事件的消费者
            disruptor.handleEventsWith(new LogEventConsumer("1"));
            //定义事件的开始
            disruptor.start();

            //获取RingBuffer
            RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
            //进行事件的发布
            LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
            for (int i = 0; i < 10; i++) {
                logEventProducer.onData(i, "logEventProducer" + i, new Date());
            }
            Thread.sleep(1000);
            //关闭Disruptor
            disruptor.shutdown();
        }
    }

参考:

disruptor的简单介绍及使用
高性能线程间队列 DISRUPTOR 简介
浅谈Disruptor
剖析Disruptor:为什么会这么快?(一)Ringbuffer的特别之处
剖析Disruptor:为什么会这么快?(一)锁的缺点
剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充
剖析Disruptor:为什么会这么快?(三)揭秘内存屏障
高性能队列——Disruptor
还在用BlockingQueue?读这篇文章,了解下Disruptor吧


来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Disruptor简单使用

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏