CompletionService实践
在之前的ExecutorCompletionService中,我们了解了其实现的原理。通过FutureTask对提交任务的封装和代理然后通过call方法进行回调并将返回值存储在FutureTask中的Object中,当获取值的时候采用阻塞队列方式进行存储。其中的阻塞队列是对FutureTask的引用,也就是说提交的任务是有序的。获取到的值也是有顺序的。
在工作中其实在某些业务场景下需要使用ComletionService进行并行处理。因为其返回值得有序性但是对外调用的并行性相对于单线程模式的for循环,其优点不言而喻。借此机会在此做一个小demo,以供日后工作借鉴。这里要注意的是CompletionService是一个接口,其背后的大Boss是ExecutorCompletionService。
public class RealDoSomeThing implements Callable<String> {
private String tool;
/***
* 您可以通过构造方法,在任务初始化的时候传入最后调用call方法的所需要的工具类或者spring类
*/
public RealDoSomeThing(String tool){
this.tool=tool;
}
/***
* 这里实现我们的业务,上边指定的String类型,表示返回值得类型
* @return
* @throws Exception
*/
@Override
public String call() throws Exception {
return this.tool+"---"+"真正的执行方法";
}
}
public class ComplationServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService= Executors.newFixedThreadPool(3);
CompletionService<String> completionService=new ExecutorCompletionService(executorService);
for (int i=0;i<10;i++){
completionService.submit(new RealDoSomeThing("工具"+i));
}
for (int i=0;i<10;i++){
String result=completionService.take().get();
System.out.println(result);
}
executorService.shutdown();
}
}
但是通过实践,我们发现所谓的返回结果的顺序性并没有体现出来。那么问题出到哪里了?通过之前的分析确实是把task任务进行缓存到了CompletionQueue队列中了。而获取值的时候就是走的ComletionQueue队列啊,可是实际并不是这样的。why?
事出反常必有妖,咋再看看源码。
咋看看咱们的宝贝ExecutorCompletionService是怎么操作的。
这下明白了吧,这玩意是在返回结果之后才加入的completionQueue队列。那么我们如何让它保持有序?如果按照我们当时的想法来说,它本就是有序的,但是它给安排到了最后才入队列。那么咋就让他在创建在没有变成线程之前就入队列。也就是我们自定义队列,然后submit之后就加入到队列中,因为java是引用。所以只要记录了引用,你跑的再远也能找得到。而submit方法本身是有返回值的。那咋就直接将返回值入队列就OK了。就是这么完美。
改造之后的具有顺序的CompletionService:
public class ComplationServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
LinkedBlockingQueue<java.util.concurrent.Future> blockingDeque= new LinkedBlockingQueue<>();
ExecutorService executorService= Executors.newFixedThreadPool(3);
CompletionService<String> completionService=new ExecutorCompletionService(executorService);
for (int i=0;i<10;i++){
java.util.concurrent.Future future=completionService.submit(new RealDoSomeThing("工具"+i));
blockingDeque.add(future);
}
for (int i=0;i<10;i++){
String result= (String) blockingDeque.take().get();
System.out.println(result);
}
executorService.shutdown();
}
}
通过自定义的任务队列缓存,然后从自定义任务队列中获取返回值,绕过ComletionService提供的任务队列就实现了上述需求。在此真的对阅读源码感到一份实实在在的力量感。所以还是要阅读源码,知其然还要知其所以然。加油
- kylin集群Nginx负载均衡
- Java 8 Stream 教程 (一)
- Python文档精要研读系列(1):map函数
- SparkML模型选择(超参数调整)与调优
- visual studio 2012 的制作ActiveX、打包和发布
- 用java提交一个Spark应用程序
- 一步步教你利用Github开源项目实现网络爬虫:以抓取证券日报新闻为例
- 用linqPad帮助你快速学习LINQ
- Java 8 Stream 教程 (二)
- CountVectorizer
- Caliburn.Micro学习笔记(五)----协同IResult
- 一个Pythoner的自我修养系列(一)
- 众里寻她千百度,蓦然回首,那bug却在灯火阑珊处
- Github|Python开源项目漫游指南(一)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 使用R语言计算遗传力
- 专属于六倍体小麦的Bioconductor注释包
- 轻松上传超过100M的文件至GitHub
- 转录组分析 | 使用DESeq2进行基因差异表达分析
- 生信基础 | 使用BLAST进行序列比对
- 批量提取基因上下游指定范围内的SNP标记
- 一文掌握Plink文件格式转换
- R语言绘图 | 绘制QQ图和曼哈顿图
- GWAS | 使用GEMMA进行全基因组关联分析
- 使用eggnog-mapper进行功能注释
- 离线环境下使用Conda安装软件
- 利用cutree划分pheatmap聚类结果
- 使用Mfuzz进行转录组表达模式聚类分析
- 使用PopLDdecay快速进行连锁不平衡分析
- 报错坑 | LDSC安装报错怎么解决?