您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

Java并发编程框架Disruptor

时间:2020-07-15 15:17:07  来源:  作者:

Disruptor是什么?

Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。本文将使用disruptor最新版3.3.6进行介绍,可以在https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧。

轮胎:RingBuffer

RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:

Java并发编程框架Disruptor

 

数组

这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。

序号

RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。

由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。

无锁的机制

在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:

一个生产者 + 一个消费者

生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。

一个生产者 + 多个消费者

多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。

多个生产者 + N个消费者

很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。

Disruptor初体验:简单的生产者和消费者

业务数据对象POJO(Event)

public class Order {

    //订单ID
    private long id;

    //订单信息
    private String info;

    //订单价格
    private double price;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getInfo() {
        return info;
    }

    public void setInfo(String info) {
        this.info = info;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

业务数据工厂(Factory)

public class OrderFactory implements EventFactory{

    @Override
    public Object newInstance() {

        System.out.println("OrderFactory.newInstance");
        return new Order();
    }

}

事件处理器(Handler,即消费者处理逻辑)

public class OrderHandler implements EventHandler<Order>{

    @Override
    public void onEvent(Order order, long l, boolean b) throws Exception {

        System.out.println(Thread.currentThread().getName() + " 消费者处理中:" + l);
        order.setInfo("info" + order.getId());
        order.setPrice(Math.random());
    }

}

Main

public class Main {

    public static void main(String[] args) throws InterruptedException {

        //创建订单工厂
        OrderFactory orderFactory = new OrderFactory();

        //ringbuffer的大小
        int RINGBUFFER_SIZE = 1024;

        //创建disruptor
        Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

        //设置事件处理器 即消费者
        disruptor.handleEventsWith(new OrderHandler());

        disruptor.start();

        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

        //-------------生产数据
        for(int i = 0 ; i < 3 ; i++){

            long sequence = ringBuffer.next();

            Order order = ringBuffer.get(sequence);

            order.setId(i);

            ringBuffer.publish(sequence);
            System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
        }

        Thread.sleep(1000);

        disruptor.shutdown();
    }

}

运行结果:

Java并发编程框架Disruptor

 

说明:

其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象。

另外在构造Disruptor的时候,在3.3.6之前使用的是API:

Java并发编程框架Disruptor

 

到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了。

构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者)。

Java并发编程框架Disruptor

 

单独使用RingBuffer:WorkerPool

如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能。

public static void main(String[] args) throws InterruptedException {

    ExecutorService executor = Executors.newFixedThreadPool(3);
    RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());
    WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());

    workerPool.start(executor);

    //-------------生产数据
    for(int i = 0 ; i < 30 ; i++){

        long sequence = ringBuffer.next();

        Order order = ringBuffer.get(sequence);
        order.setId(i);

        ringBuffer.publish(sequence);

        System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
    }

    Thread.sleep(1000);
    
    workerPool.halt();
    executor.shutdown();

}

实际上是利用WorkerPool辅助连接消费者。

一个生产者+多个消费者

Java并发编程框架Disruptor

 

public static void main(String[] args) throws InterruptedException {

    //创建订单工厂
    OrderFactory orderFactory = new OrderFactory();

    //ringbuffer的大小
    int RINGBUFFER_SIZE = 1024;

    //创建disruptor
    Disruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());

    //设置事件处理器 即消费者
    EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());
    eventHandlerGroup.then(new OrderHandler3());
    disruptor.start();

    RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();

    //-------------生产数据
    for(int i = 0 ; i < 3 ; i++){

        long sequence = ringBuffer.next();

        Order order = ringBuffer.get(sequence);

        order.setId(i);

        ringBuffer.publish(sequence);
        System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);
    }

    Thread.sleep(1000);
    disruptor.shutdown();
}

运行结果:

Java并发编程框架Disruptor

 

生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理。

如果我们想顺序的按照A->B->C呢?

        disruptor.handleEventsWith(new Handler1()).
        	handleEventsWith(new Handler2()).
        	handleEventsWith(new Handler3());

如果我们想六边形操作呢?

Java并发编程框架Disruptor

 

        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);

到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已。



Tags:Disruptor   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
队列比较队列队列比较总结:就性能而言,无锁(什么也不加) > CAS > LOCK;从现实使用中考虑,我们一般选择有界队列(避免生产者速度过快,导致内存溢出);同时,为了减少Java的垃圾回收对系...【详细内容】
2020-12-15  Tags: Disruptor  点击:(178)  评论:(0)  加入收藏
Disruptor是一个开源框架,研发的初衷是为了解决高并发下队列锁的问题,最早由LMAX提出并使用,能够在无锁的情况下实现队列的并发操作,并号称能够在一个线程里每秒处理6百万笔订单...【详细内容】
2020-09-10  Tags: Disruptor  点击:(215)  评论:(0)  加入收藏
Disruptor是什么?Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一...【详细内容】
2020-07-15  Tags: Disruptor  点击:(124)  评论:(0)  加入收藏
▌简易百科推荐
本篇文章主要介绍了使用MyBatis框架完成数据库的增、删、改、查操作。准备工作运行schema.sql和data.sql脚本文件中的 SQL 语句创建t_user表并添加部分测试数据。schema.sql...【详细内容】
2022-07-15  嗨皮汪小成    Tags:MyBatis   点击:(0)  评论:(0)  加入收藏
1 Hive基本概念Hive是一个构建在Hadoop上的数据仓库框架。最初,Hive是由Facebook开发,后来移交由Apache软件基金会开发,并作为一个Apache开源项目。Hive是基于Hadoop的一个数据...【详细内容】
2022-07-15  秃头Java人    Tags:Hive   点击:(2)  评论:(0)  加入收藏
今天给大家讲讲 SpringBoot 框架 整合 Elasticsearch 实现海量级数据搜索。一、简介在上篇ElasticSearch 文章中,我们详细的介绍了 ElasticSearch 的各种 api 使用。实际的项...【详细内容】
2022-07-15  java小悠    Tags: Elasticsearch   点击:(3)  评论:(0)  加入收藏
SpringBoot开发Restful接口,有什么API规范吗?如何快速生成API文档呢?Swagger 是一个用于生成、描述和调用 RESTful 接口的 Web 服务。通俗的来讲,Swagger 就是将项目中所有(想要...【详细内容】
2022-07-14  Java全栈知识体系    Tags:Swagger   点击:(2)  评论:(0)  加入收藏
一、部署准备安装数据库、jdk、nginx、域名证书1、下载 nginx,官方网址如下:http://nginx.org/en/download.html2、解压安装包到任意目录 如:G:\nginx二、前端部署1、打开前端...【详细内容】
2022-07-14  智慧魔法豆浆    Tags:vue   点击:(2)  评论:(0)  加入收藏
SpringBoot 内置支持的 Web 容器有 Tomcat、Undertow、Jetty 和 Netty。默认情况下,这些 Web 服务的 AccessLog 日志是不开启的,而 AccessLog 日志对于做接口统计尤为重要。如...【详细内容】
2022-07-13  BUG弄潮儿    Tags:AccessLog 日志   点击:(10)  评论:(0)  加入收藏
什么是Starterstarter 是springboot 的核心,每个starter负责实现特定的功能,使用者只需引入starter即可自动配置,无需关心框架整合带来的问题。Starter 项目结构src |- main...【详细内容】
2022-07-12  IT食者    Tags:SpringBoot   点击:(9)  评论:(0)  加入收藏
mybaits非必填项处理数据库表字段khzjyxqx为日期型,非必填, 前台页面如下: 后台mybaits处理如下: 如果不处理,当为空时khzjyxqx=&#39;&#39;时会报错。<update id="updatesave" pa...【详细内容】
2022-07-11  在水一方357159258    Tags:mybaits   点击:(10)  评论:(0)  加入收藏
关于过气网红编程语言 Ruby,我们此前曾发过一篇文章去回顾其大受追捧的过往,并讨论了它每况愈下的生存状态。不过人气并不能直接说明语言质量差,一方面 Ruby on Rails(用 Ruby...【详细内容】
2022-07-08  InfoQ    Tags: Web 框架   点击:(9)  评论:(0)  加入收藏
1、JWT的构成- 头部(header):描述该JWT的最基本的信息,如类型以及签名所用的算法。- 负载(payload):存放有效信息的地方。- 签证(signature):base64加密后的header、base64加...【详细内容】
2022-07-08  dream19    Tags:SpringBoot   点击:(10)  评论:(0)  加入收藏
相关文章
    无相关信息
站内最新
站内热门
站内头条