Spark Streaming自定义Receivers
时间:2022-04-29
本文章向大家介绍Spark Streaming自定义Receivers,主要内容包括自定义一个Receiver、An Actor as Receiver、A Sample Spark Application、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
自定义一个Receiver
class SocketTextStreamReceiver(host: String, port: Int(
extends NetworkReceiver[String]
{
protected lazy val blocksGenerator: BlockGenerator =
new BlockGenerator(StorageLevel.MEMORY_ONLY_SER_2)
protected def onStart() = {
blocksGenerator.start()
val socket = new Socket(host, port)
val dataInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
var data: String = dataInputStream.readLine()
while (data != null) {
blocksGenerator += data
data = dataInputStream.readLine()
}
}
protected def onStop() {
blocksGenerator.stop()
}
}
An Actor as Receiver
class SocketTextStreamReceiver (host:String,
port:Int,
bytesToString: ByteString => String) extends Actor with Receiver {
override def preStart = IOManager(context.system).connect(host, port)
def receive = {
case IO.Read(socket, bytes) => pushBlock(bytesToString(bytes))
}
}
A Sample Spark Application
val ssc = new StreamingContext(master, "WordCountCustomStreamSource",
Seconds(batchDuration))
//使用自定义的receiver
val lines = ssc.networkStream[String](new SocketTextStreamReceiver(
"localhost", 8445))
//或者使用这个自定义的actor Receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver") */
提交成功之后,启动Netcat测试一下
$ nc -l localhost 8445 hello world hello hello
下面是合并多个输入流的方法:
val lines = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8445, z => z.utf8String)),"SocketReceiver")
// Another socket stream receiver
val lines2 = ssc.actorStream[String](Props(new SocketTextStreamReceiver(
"localhost",8446, z => z.utf8String)),"SocketReceiver")
val union = lines.union(lines2)
- MySQL Tips【Updating】
- Meltdown、Spectre攻击---CPU乱序执行和预测执行导致的安全问题
- WordPress 4.6远程代码执行漏洞(CVE-2016-10033)复现环境搭建指南
- 相似文档查找算法之 simHash 简介及其 java 实现
- Hadoop 中利用 mapreduce 读写 mysql 数据
- Android O中对TEE加解密算法的新要求
- storm 原理简介及单机版安装指南
- Python Tips, Tricks, and Hacks
- 英特尔放出Linux微代码以修复Meltdown和Spectre漏洞
- python基础(5):深入理解 python 中的赋值、引用、拷贝、作用域
- Linux SSH密码暴力破解技术及攻防实战
- 西部数据NAS设备被曝存在硬编码后门和未授权文件上传高危漏洞
- Hive & Performance 学习笔记
- 任意用户密码重置(一):重置凭证泄漏
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Java单元测试——框架(三)——testNG
- Nginx系列:后端服务应用健康检测
- 那些大厂必问的Handler和Binder,有必要去研究么?
- 欢迎来到 TreeMap 的吐槽大会
- OMG,12 个精致的 Java 字符串操作小技巧,学它
- Nginx系列:安全下载模块
- 5分钟入门GANS:原理解释和keras代码实现
- 使用ML 和 DNN 建模的技巧总结
- 医学图像分割模型U-Net介绍和Kaggle的Top1解决方案源码解析
- 机器学习中的音频特征:理解Mel频谱图
- 兄弟,如何淡定地渡过七夕?
- Spring 源码第 9 篇,深入分析 FactoryBean
- PowerBI 动态数据格式 高级版 以及重要通知
- 气哭老板的顶级密钥存放方案,又做了一件蠢事
- 构建没有数据集的辣辣椒分类器,准确性达到96%