编程语言disruptor笔记之三:环形队列的基础操作(不用Disruptor类)

欢迎访问我的GitHubhttps://github.com/zq2599/blog_demos内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;《disruptor笔记》系列链接快速入门Disruptor类分析环形队列的基础操作(不用Disruptor类)事件消费知识点小结事件消费实战常见场景等待策略知识点补充(终篇)本篇概览本文是《disruptor笔记》系列的第三篇,主要任务是编码实现消息生产和消费,与《disruptor笔记之一:快速入门》不同的是,本次开发不使用Disruptor类,和Ring Buffer(环形队列)相关的操作都是自己写代码实现;这种脱离Disruptor类操作Ring Buffer的做法,不适合用在生产环境,但在学习Disruptor的过程中,这是种高效的学习手段,经过本篇实战后,在今后使用Disruptor时,您在开发、调试、优化等各种场景下都能更加得心应手;简单的消息生产消费已不能满足咱们的学习热情,今天的实战要挑战以下三个场景:100个事件,单个消费者消费;100个事件,三个消费者,每个都独自消费这个100个事件;100个事件,三个消费者共同消费这个100个事件;前文回顾为了完成本篇的实战,前文《disruptor笔记之二:Disruptor类分析》已做了充分的研究分析,建议观看,这里简单回顾以下Disruptor类的几个核心功能,这也是咱们编码时要实现的:创建环形队列(RingBuffer对象)创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件创建BatchEventProcessor,负责消费事件绑定BatchEventProcessor对象的异常处理类调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer启动独立线程,用来执行消费事件的业务逻辑理论分析已经完成,接下来开始编码;源码下载本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):名称链接备注项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址,https协议git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该项目源码的仓库地址,ssh协议这个git项目中有多个文件夹,本次实战的源码在disruptor-tutorials文件夹下,如下图红框所示:

disruptor-tutorials是个父工程,里面有多个module,本篇实战的module是low-level-operate,如下图红框所示:

开发进入编码阶段,今天的任务是挑战以下三个场景:100个事件,单个消费者消费;100个事件,三个消费者,每个都独自消费这个100个事件;100个事件,三个消费者共同消费这个100个事件;咱们先把工程建好,然后编写公共代码,例如事件定义、事件工厂等,最后才是每个场景的开发;在父工程disruptor-tutorials新增名为low-level-operate的module,其build.gradle如下:plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    implementation 'org.springframework.boot:spring-boot-starter-web'    implementation 'com.lmax:disruptor'    testImplementation('org.springframework.boot:spring-boot-starter-test')}然后是springboot启动类:package com.bolingcavalry;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class LowLevelOperateApplication {public static void main(String[] args) {SpringApplication.run(LowLevelOperateApplication.class, args);}}事件类,这是事件的定义:package com.bolingcavalry.service;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;@Data@ToString@NoArgsConstructorpublic class StringEvent {    private String value;}事件工厂,定义如何在内存中创建事件对象:package com.bolingcavalry.service;import com.lmax.disruptor.EventFactory;public class StringEventFactory implements EventFactory{    @Override    public StringEvent newInstance() {        return new StringEvent();    }}事件生产类,定义如何将业务逻辑的事件转为disruptor事件发布到环形队列,用于消费:package com.bolingcavalry.service;import com.lmax.disruptor.RingBuffer;public class StringEventProducer {    // 存储数据的环形队列    private final RingBufferringBuffer;    public StringEventProducer(RingBufferringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(String content) {        // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置        long sequence = ringBuffer.next();        try {            // sequence位置取出的事件是空事件            StringEvent stringEvent = ringBuffer.get(sequence);            // 空事件添加业务信息            stringEvent.setValue(content);        } finally {            // 发布            ringBuffer.publish(sequence);        }    }}事件处理类,收到事件后具体的业务处理逻辑:package com.bolingcavalry.service;import com.lmax.disruptor.EventHandler;import lombok.Setter;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringEventHandler implements EventHandler{    public StringEventHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);        // 这里延时100ms,模拟消费事件的逻辑的耗时        Thread.sleep(100);        // 如果外部传入了consumer,就要执行一次accept方法        if (null!=consumer) {            consumer.accept(null);        }    }}定义一个接口,外部通过调用接口的方法来生产消息,再放几个常量在里面后面会用到:package com.bolingcavalry.service;public interface LowLevelOperateService {    /**     * 消费者数量     */    int CONSUMER_NUM = 3;    /**     * 环形缓冲区大小     */    int BUFFER_SIZE = 16;    /**     * 发布一个事件     * @param value     * @return     */    void publish(String value);    /**     * 返回已经处理的任务总数     * @return     */    long eventCount();}以上就是公共代码了,接下来逐个实现之前规划的三个场景;100个事件,单个消费者消费这是最简单的功能了,实现发布消息和单个消费者消费的功能,代码如下,有几处要注意的地方稍后提到:package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.BatchEventProcessor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.SequenceBarrier;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("oneConsumer")@Slf4jpublic class OneConsumerServiceImpl implements LowLevelOperateService { private RingBufferringBuffer; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); private ExecutorService executors; @PostConstruct private void init() { // 准备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将已经处理事件的总数打印出来 Consumer<?> eventCountPrinter = new Consumer() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; // 创建环形队列实例 ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); // 准备线程池 executors = Executors.newFixedThreadPool(1); //创建SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); // 创建事件处理的工作类,里面执行StringEventHandler处理事件 BatchEventProcessorbatchEventProcessor = new BatchEventProcessor<>( ringBuffer, sequenceBarrier, new StringEventHandler(eventCountPrinter)); // 将消费者的sequence传给环形队列 ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 在一个独立线程中取事件并消费 executors.submit(batchEventProcessor); // 生产者 producer = new StringEventProducer(ringBuffer); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); }}上述代码有以下几处需要注意:自己创建环形队列RingBuffer实例自己准备线程池,里面的线程用来获取和消费消息自己动手创建BatchEventProcessor实例,并把事件处理类传入通过ringBuffer创建sequenceBarrier,传给BatchEventProcessor实例使用将BatchEventProcessor的sequence传给ringBuffer,确保ringBuffer的生产和消费不会出现混乱启动线程池,意味着BatchEventProcessor实例在一个独立线程中不断的从ringBuffer中获取事件并消费;为了验证上述代码能否正常工作,我这里写了个单元测试类,如下所示,逻辑很简单,调用OneConsumerServiceImpl.publish方法一百次,产生一百个事件,再检查OneConsumerServiceImpl记录的消费事件总数是不是等于一百:package com.bolingcavalry.service.impl;import com.bolingcavalry.service.LowLevelOperateService;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import static org.junit.Assert.assertEquals;@RunWith(SpringRunner.class)@SpringBootTest@Slf4jpublic class LowLeverOperateServiceImplTest {    @Autowired    @Qualifier("oneConsumer")    LowLevelOperateService oneConsumer;    private static final int EVENT_COUNT = 100;    private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {        for(int i=0;i<eventcount;i++) {="" log.info("publich="" {}",="" i);="" service.publish(string.valueof(i));="" }="" 异步消费,因此需要延时等待="" thread.sleep(10000);="" 消费的事件总数应该等于发布的事件数="" assertequals(expecteventcount,="" service.eventcount());="" @test="" public="" void="" testoneconsumer()="" throws="" interruptedexception="" log.info("start="" testoneconsumerservice");="" testlowleveloperateservice(oneconsumer,="" event_count,="" event_count);=""注意,如果您是直接在IDEA上点击图标来执行单元测试,记得勾选下图红框中选项,否则可能出现编译失败:

执行上述单元测试类,结果如下图所示,消息的生产和消费都符合预期,并且消费逻辑是在独立线程中执行的:

继续挑战下一个场景;100个事件,三个消费者,每个都独自消费这个100个事件这个场景在kafka中也有,就是三个消费者的group不同,这样每一条消息,这两个消费者各自消费一次;因此,100个事件,3个消费者每人都会独立消费这100个事件,一共消费300次;代码如下,有几处要注意的地方稍后提到:package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.BatchEventProcessor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.SequenceBarrier;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("multiConsumer")@Slf4jpublic class MultiConsumerServiceImpl implements LowLevelOperateService { private RingBufferringBuffer; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); /** * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息 * @param executorService */ private void addProcessor(ExecutorService executorService) { // 准备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将已经处理事件的总数打印出来 Consumer<?> eventCountPrinter = new Consumer() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; BatchEventProcessorbatchEventProcessor = new BatchEventProcessor<>( ringBuffer, ringBuffer.newBarrier(), new StringEventHandler(eventCountPrinter)); // 将当前消费者的sequence实例传给ringBuffer ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 启动独立线程获取和消费事件 executorService.submit(batchEventProcessor); } @PostConstruct private void init() { ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); // 创建多个消费者,并在独立线程中获取和消费事件 for (int i=0;i<consumer_num;i++) {="" addprocessor(executorservice);="" }="" 生产者="" producer="new" stringeventproducer(ringbuffer);="" @override="" public="" void="" publish(string="" value)="" producer.ondata(value);="" long="" eventcount()="" return="" eventcount.get();=""上述代码和前面的OneConsumerServiceImpl相比差别不大,主要是创建了多个BatchEventProcessor实例,然后分别在线程池中提交;验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testLowLevelOperateService的第三个参数是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示预期的被消费消息数为300:@Autowired    @Qualifier("multiConsumer")    LowLevelOperateService multiConsumer;    @Test    public void testMultiConsumer() throws InterruptedException {        log.info("start testMultiConsumer");        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);    }执行单元测试,如下图所示,一共消费了300个事件,并且三个消费者在不动线程:

100个事件,三个消费者共同消费这个100个事件本篇的最后一个实战是发布100个事件,然后让三个消费者共同消费100个(例如A消费33个,B消费33个,C消费34个);前面用到的BatchEventProcessor是用来独立消费的,不适合多个消费者共同消费,这种多个消费共同消费的场景需要借助WorkerPool来完成,这个名字还是很形象的:一个池子里面有很多个工作者,把任务放入这个池子,工作者们每人处理一部分,大家合力将任务完成;传入WorkerPool的消费者需要实现WorkHandler接口,于是新增一个实现类:package com.bolingcavalry.service;import com.lmax.disruptor.WorkHandler;import lombok.extern.slf4j.Slf4j;import java.util.function.Consumer;@Slf4jpublic class StringWorkHandler implements WorkHandler{    public StringWorkHandler(Consumer<?> consumer) {        this.consumer = consumer;    }    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次    private Consumer<?> consumer;    @Override    public void onEvent(StringEvent event) throws Exception {        log.info("work handler event : {}", event);        // 这里延时100ms,模拟消费事件的逻辑的耗时        Thread.sleep(100);        // 如果外部传入了consumer,就要执行一次accept方法        if (null!=consumer) {            consumer.accept(null);        }    }}新增服务类,实现共同消费的逻辑,有几处要注意的地方稍后会提到:package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;import com.lmax.disruptor.*;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicLong;import java.util.function.Consumer;@Service("workerPoolConsumer")@Slf4jpublic class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {    private RingBufferringBuffer;    private StringEventProducer producer;    /**     * 统计消息总数     */    private final AtomicLong eventCount = new AtomicLong();    @PostConstruct    private void init() {        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);        StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];        // 创建多个StringWorkHandler实例,放入一个数组中        for (int i=0;i < CONSUMER_NUM;i++) {            handlers[i] = new StringWorkHandler(o -> {                long count = eventCount.incrementAndGet();                log.info("receive [{}] event", count);            });        }        // 创建WorkerPool实例,将StringWorkHandler实例的数组传进去,代表共同消费者的数量        WorkerPoolworkerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);        // 这一句很重要,去掉就会出现重复消费同一个事件的问题        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());        workerPool.start(executorService);        // 生产者        producer = new StringEventProducer(ringBuffer);    }    @Override    public void publish(String value) {        producer.onData(value);    }    @Override    public long eventCount() {        return eventCount.get();    }}上述代码中,要注意的有以下两处:StringWorkHandler数组传入给WorkerPool后,每个StringWorkHandler实例都放入一个新的WorkProcessor实例,WorkProcessor实现了Runnable接口,在执行workerPool.start时,会将WorkProcessor提交到线程池中;和前面的独立消费相比,共同消费最大的特点在于只调用了一次ringBuffer.addGatingSequences方法,也就是说三个消费者共用一个sequence实例;验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testWorkerPoolConsumer的第三个参数是EVENT_COUNT,表示预期的被消费消息数为100:@Autowired    @Qualifier("workerPoolConsumer")    LowLevelOperateService workerPoolConsumer;        @Test    public void testWorkerPoolConsumer() throws InterruptedException {        log.info("start testWorkerPoolConsumer");        testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);    }执行单元测试如下图所示,三个消费者一共消费100个事件,且三个消费者在不同线程:

至此,咱们在不用Disruptor类的前提下完成了三种常见场景的消息生产消费,相信您对Disruptor的底层实现也有了深刻认识,今后不论是使用还是优化Disruptor,一定可以更加得心应手;你不孤单,欣宸原创一路相伴Java系列Spring系列Docker系列kubernetes系列数据库+中间件系列DevOps系列欢迎关注公众号:程序员欣宸微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...https://github.com/zq2599/blog_demos文章来源:https://www.cnblogs.com/bolingcavalry/p/15336337.html后台-插件-广告管理-栏目头部广告位版权声明本文仅代表作者观点,不代表本站立场。本文系作者授权发表。本文地址:https://www.jcdi.cn/phpjc/241374.html

(0)

相关推荐