pyspark读取pickle文件内容并存储到hive
时间:2022-07-28
本文章向大家介绍pyspark读取pickle文件内容并存储到hive,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。
过程:
- 使用pickle模块读取.plk文件;
- 将读取到的内容转为RDD;
- 将RDD转为DataFrame之后存储到Hive仓库中;
1、使用pickle保存和读取pickle文件
import pickle
data = ""
path = "xxx.plj"
#保存为pickle
pickle.dump(data,open(path,'wb'))
#读取pickle
data2 = pickle.load(open(path,'rb'))
使用python3读取python2保存的pickle文件时,会报错:
UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)
解决方法:
data2 = pickle.load(open(path,'rb',encoding='latin1'))
使用python2读取python3保存的pickle文件时,会报错:
unsupported pickle protocol:3
解决方法:
import pickle
path = "xxx.plk"
path2 = 'xxx2.plk'
data = pickle.load(open(path,'rb'))
#保存为python2的pickle
pickle.dump(data,open(path2,'wb'),protocol=2)
#读取pickle
data2 = pickle.load(open(path2,'rb'))
2、读取pickle的内容并转为RDD
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pickle
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
with open(picle_path,"rb") as fp:
data = pickle.load(fp)
#这里可根据data的类型进行相应的操作
#假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
pickleRdd = spark.parallelize(data)
3、将rdd转为dataframe并存入到Hive中
#定义列名
column = Row('col')
#转为dataframe
pickleDf =pickleRdd.map(lambda x:column(x))
#存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
pickleDf..write.saveAsTable("hive_database.hvie_table", mode='overwrite', partitionBy=‘’)
补充存入到Hive中的知识:
(1)通过sql的方式
data = [
(1,"3","145"),
(1,"4","146"),
(1,"5","25"),
(1,"6","26"),
(2,"32","32"),
(2,"8","134"),
(2,"8","134"),
(2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")
或者:
# df 转为临时表/临时视图
df.createOrReplaceTempView("df_tmp_view")
# spark.sql 插入hive
spark.sql(""insert overwrite table
XXXXX # 表名
partition(分区名称=分区值) # 多个分区按照逗号分开
select
XXXXX # 字段名称,跟hive字段顺序对应,不包含分区字段
from df_tmp_view""")
(2)以saveAsTable的形式
# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
以下是通过rdd创建dataframe的几种方法:
(1)通过键值对
d = [{'name': 'Alice', 'age': 1}]
output = spark.createDataFrame(d).collect()
print(output)
# [Row(age=1, name='Alice')]
(2)通过rdd
a = [('Alice', 1)]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
print(output)
output = spark.createDataFrame(rdd, ["name", "age"]).collect()
print(output)
# [Row(_1='Alice', _2=1)]
# [Row(name='Alice', age=1)]
(3)通过rdd和Row
from pyspark.sql import Row
a = [('Alice', 1)]
rdd = sc.parallelize(a)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()
print(output)
# [Row(name='Alice', age=1)]
(4)通过rdd和StrutType
from pyspark.sql.types import *
a = [('Alice', 1)]
rdd = sc.parallelize(a)
schema = StructType(
[
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
]
)
output = spark.createDataFrame(rdd, schema).collect()
print(output)
# [Row(name='Alice', age=1)]
(5)基于pandas dataframe创建
df = spark.createDataFrame(rdd, ['name', 'age'])
print(df) # DataFrame[name: string, age: bigint]
print(type(df.toPandas())) # <class 'pandas.core.frame.DataFrame'>
# 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
print(output)
# [Row(name='Alice', age=1)]
参考:
https://blog.csdn.net/sinat_28224453/article/details/84977693
https://blog.csdn.net/weixin_39198406/article/details/104916715
https://blog.csdn.net/u011412768/article/details/93426353
- Linux下FTP环境部署梳理(vsftpd和proftpd)
- Silverlight如何与JS相互调用
- Docker容器学习梳理--私有仓库Registry使用
- 从插件重构看如何提升测试质量与效率
- 巧用WinRAR+Javascript解决activeX的自动安装问题
- 在网页中实现QQ的屏幕截图功能
- Activity之间传递参数
- linux下rsync和tar增量备份梳理
- 重温Delphi之:面向对象
- Android新手之旅(15) Win7下配置遇到的问题
- 重温Delphi之:如何定义一个类
- Android新手之旅(2) 新手问题
- Android新手之旅(2) 新手问题
- Android新手之旅(9) 自定义的折线图
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- gremlin-both()与bothE().bothV()的区别
- C# Time.deltaTime 增量时间
- Docker--docker ps 命令与结果解析
- Junit5系列-Junit5中@DisplayName自定义名称
- Junit5系列-Junit5中Assertions断言类
- 使用libev监视文件夹下文件(夹)属性变动的方案和实现
- C++拾取——stl标准库中集合交集、并集、差集、对称差方法
- netty结合Protostuff传输对象案例,单机压测秒级接收35万个对象
- Kubernates之Deployment滚动升级和回滚
- 简记特定容器list和forward_list算法
- Games101--Assignment2
- 快速学习-如何使用sentinel
- springboot-mybatis-demo遇到的坑
- 快速学习-Sentinel 工作主流程
- 快速学习-Sentinel 流量控制