Java 8 Stream 教程 (三)
作者:Benjamin
译者:java达人
来源:http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/(点击阅读原文前往)
前面的教程:
并行stream
为增强大数据量下的运行性能,stream可以并行执行。并行stream通过静态方法ForkJoinPool.commonPool()使用ForkJoinPool。底层线程池的大小最多5个线程—这取决于可用物理CPU核的数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
在我的机器上, common pool 初始化为3 默认值。通过设置以下JVM参数,可以调整该值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持方法parallelStream()创建一个并行的元素stream。或者,您可以在给定的stream上调用中间方法parallel(),将顺序stream转换为并行的stream。
为了理解并行stream的并行执行行为,下一个示例将打印当前线程的信息:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]n",
s, Thread.currentThread().getName()));
通过观察调试输出,我们应该能更好地理解哪些线程被用于执行stream操作:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
正如您所见,并行stream利用了ForkJoinPoolfor执行stream操作的所有可用线程。由于特定线程实际的行为是不确定性的,所以输出在多次运行时可能会有所不同。
让我们通过额外的stream操作来扩展示例,sorted:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]n",
s, Thread.currentThread().getName()));
乍一看,结果可能会很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
sort似乎只在主线程上顺序执行。实际上,在并行stream上,sort底层使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,sort顺序还是并行执行,取决于数组的长度:
If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.
回到上一节中reduce的例子。我们已经发现, combiner函数只在并行stream中调用,而不是在顺序stream中调用。我们看看哪些线程用到了:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));persons .parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台输出显示 accumulator和combiner函数在所有可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
综上所述,可以得出并行stream在大数据量下会带来不错的性能提升。但是请记住,一些并行stream操作(如reduce和collect)需要额外的计算(合并操作),而这些操作在顺序执行时是不需要的。
此外,我们了解到所有的并行stream操作都共享同一个jvm范围的ForkJoinPool。因此,您要避免慢阻塞stream操作,因为这可能会减慢应用程序中重度依赖于并行stream的部分。
结尾
我的Java 8 stream编程指南在这里完结了。如果您有兴趣了解更多关于Java 8 stream的知识,我向您推荐Stream Javadoc文档。如果您想了解更多关于底层机制的知识,可以阅读Martin fowler关于Collection Pipelines的文章。
如果您对JavaScript感兴趣,可以看看Stream.js —Java 8 Streams API的一个JavaScript实现。您还可以阅读我的Java 8 Tutorial 和 Java 8 Nashorn Tutorial.
希望本教程对您有所帮助,您喜欢阅读。本教程示例的完整源代码托管在GitHub上。你可以免费fork,或者通过Twitter向我发送你的反馈。
编程愉快!
相关链接:
Java 8 Tutorial 和 Java 8 Nashorn Tutorial
http://winterbe.com/posts/2014/03/16/java-8-tutorial/
http://winterbe.com/posts/2014/04/05/java8-nashorn-tutorial/
github源码:
https://github.com/login?return_to=https%3A%2F%2Fgithub.com%2Fwinterbe%2Fjava8-tutorial%2Ffork
- 再学习之Spring(依赖注入).
- 使用序列的问题ORA-02287(r5笔记第19天)
- Java多线程详解2
- Java多线程详解3
- SpringMVC处理multipart请求.
- 一条简单的sql语句运行15天的原因分析(r5笔记第17天)
- 巧用flashback database实现灵活的数据回滚(r5笔记第16天)
- Spring Cache For Redis.
- css重写checkbox样式
- 通过shell脚本同时监控多个数据库负载(r5笔记第14天)
- Java 定时器 Timer 的使用.
- 通过shell脚本来统计段大小(r5笔记第14天)
- Linux下配置MySQL主从复制(r5笔记第13天)
- Final 关键字
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 设计模式 | 模版方法
- Python 函数3000字使用总结
- 3D摇杆控制器一种简单实现!Cocos Creator 3D!
- 数据结构 | TencentOS-tiny中队列、环形队列、优先级队列的实现及使用
- RTOS内功修炼记(六)—— 任务间通信为什么不用全局变量?
- 程序员必备基础:加签验签
- 【Rust日报】2020-07-16 j4rs,一个在 Rust 中调用 Java 代码的 Crate
- Vue.js 3 正式进入 RC 阶段
- FeignClient注解及参数问题---SpringCloud微服务
- Rust FFI 编程 - Rust导出共享库01
- 【每周一库】 see - An HTTP server for hosting static files
- 【Rust日报】2020-07-17 无船同志新博客:Shipping Const Generics in 2020
- SpringCloud--Config Server配置中心学习总结
- 【Rust日报】2020-07-18 提升ARM64 Linux平台支持到Tier-1
- SpringCloud+MyBatis(oracle)逆向工程自动生成代码