
Python 数据科学
速查表
PySpark - RDD 基础
DataCamp
Learn Python f
or Data Science Interactively
PySpark 是 Spark 的 Python API,允许 Python 调用
Spark 编程模型。
>>> from pyspark import SparkContext
>>> sc = SparkContext(master = 'local[2]')
加载数据
>>> rdd = sc.parallelize([('a',7),('a',2),('b',2)])
>>> rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
>>> rdd3 = sc.parallelize(range(100))
>>> rdd4 = sc.parallelize([("a",["x","y","z"]),
("b",["p", "r"])])
Spark
>>> rdd.map(lambda x: x+(x[1],x[0]))
.collect()
[('a',7,7,'a'),('a',2,2,'a'),('b',2,2,'b')]
>>> rdd5 = rdd.atMap(lambda x: x+(x[1],x[0]))
>>> rdd5.collect()
['a',7,7,'a','a',2,2,'a','b',2,2,'b']
>>> rdd4.atMapValues(lambda x: x)
.collect()
对每个 RDD 元素执行函数
对每个 RDD 元素执行函数,并
拉平结果
不改变键,对 rdd4 的每个键值对
执行 flatMap 函数
[('a','x'),('a','y'),('a','z'),('b','p'),('b','r')]
应用函数
>>> sc.stop()
>>> sc.version
>>> sc.pythonVer
>>> sc.master
>>> str(sc.sparkHome)
>>> str(sc.sparkUser())
>>> sc.appName
>>> sc.applicationId
>>> sc.defaultParallelism
>>> sc.defaultMinPartitions
获取 SparkContext 版本
获取 Python 版本
要连接的 Master URL
Spark 在工作节点的安装路径
获取 SparkContext 的 Spark 用户名
返回应用名称
获取应用程序ID
返回默认并行级别
RDD默认最小分区数
>>> from pyspark import SparkConf, SparkContext
>>> conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
>>> sc = SparkContext(conf = conf)
配置
SparkContext
终止 SparkContext
使用 Shell
$ ./bin/spark-shell --master local[2]
$ ./bin/pyspark --master local[4] --py-les code.py
核查 SparkContext
用 --master 参数设定 Context 连接到哪个 Master 服务器,通过传递逗号分隔
列表至 --py-files 添加 Python.zip、.egg 或 .py文件到 Runtime 路径。
执行程序
$ ./bin/spark-submit examples/src/main/python/pi.py
PySpark Shell 已经为 SparkContext 创建了名为 sc 的变量。
>>> rdd.saveAsTextFile("rdd.txt")
>>> rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",
'org.apache.hadoop.mapred.TextOutputFormat'
)
>>> rdd3.max()
99
>>> rdd3.min()
0
>>> rdd3.mean()
49.5
>>> rdd3.stdev()
28.866070047722118
>>> rdd3.variance()
833.25
>>> rdd3.histogram(3)
([0,33,66,99],[33,33,34])
>>> rdd3.stats()
RDD 元素的最大值
RDD 元素的最小值
RDD 元素的平均值
RDD 元素的标准差
计算 RDD 元素的方差
分箱(Bin)生成直方图
综合统计
包括:计数、平均值、标准差、最大值和最小值
保存
提取 RDD 信息
>>> rdd.getNumPartitions()
>>> rdd.count()
3
>>> rdd.countByKey()
defaultdict(<type 'int'>,{'a':2,'b':1})
>>> rdd.countByValue()
defaultdict(<type 'int'>,{('b',2):1,('a',2):1,('a',7):1})
>>> rdd.collectAsMap()
{'a': 2,'b': 2}
>>> rdd3.sum()
4950
>>> sc.parallelize([]).isEmpty()
列出分区数
计算 RDD 实例数量
按键计算 RDD 实例数量
按值计算 RDD 实例数量
以字典形式返回键值
汇总 RDD 元素
检查 RDD 是否为空
True
汇总
基础信息
>>> rdd.repartition(4)
>>> rdd.coalesce(1)
新建一个含4个分区的 RDD
将 RDD 中的分区数缩减为1个
重分区
并行集合
>>> textFile = sc.textFile("/my/directory/*.txt")
>>> textFile2 = sc.wholeTextFiles("/my/directory/")
排序
>>> rdd2.sortBy(lambda x: x[1])
.collect()
[('d',1),('b',1),('a',2)]
>>> rdd2.sortByKey()
按给定函数排序 RDD
按键排序 RDD的键值对
.collect()
[('a',2),('b',1),('d',1)]
数学运算
>>> rdd.subtract(rdd2)
.collect()
[('b',2),('a',7)]
>>> rdd2.subtractByKey(rdd)
.collect()
[('d', 1)]
>>> rdd.cartesian(rdd2).collect()
返回在 rdd2 里没有匹配键的 rdd
键值对
返回 rdd2 里的每个(键,值)
对,rdd中没有匹配的键
返回 rdd 和 rdd2 的笛卡尔积
外部数据
使用 textFile() 函数从HDFS、本地文件或其它支持 Hadoop 的文件系统
里读取文本文件,或使用 wholeTextFiles() 函数读取目录里文本文件。
规约
>>> rdd.reduceByKey(lambda x,y : x+y)
.collect()
[('a',9),('b',2)]
>>> rdd.reduce(lambda a, b: a + b)
合并每个键的 RDD 值
合并 RDD 的值
('a',7,'a',2,'b',2)
分组
>>> rdd3.groupBy(lambda x: x % 2)
.mapValues(list)
.collect()
>>> rdd.groupByKey()
.mapValues(list)
.collect()
[('a',[7,2]),('b',[2])]
>>> seqOp = (lambda x,y: (x[0]+y,x[1]+1))
> > > c o m b O p = (l a m b d a x,y:(x[0]+y[0],x[1]+y[1]))
>>> rdd3.aggregate((0,0),seqOp,combOp)
(4950,100)
>>> rdd.aggregateByKey((0,0),seqop,combop)
.collect()
[('a',(9,2)), ('b',(2,1))]
>>> rdd3.fold(0,add)
4950
>>> rdd.foldByKey(0, add)
.collect()
[('a',9),('b',2)]
>>> rdd3.keyBy(lambda x: x+x)
.collect()
改变数据形状
>>> def g(x): print(x)
>>> rdd.foreach(g)
为所有RDD应用函数
('a', 7)
('b', 2)
('a', 2)
迭代
选择数据
抽样
>>> rdd3.sample(False, 0.15, 81).collect()
[3,4,27,31,40,41,42,43,60,76,79,80,86,97]
>>> rdd.lter(lambda x: "a" in x)
.collect()
[('a',7),('a',2)]
>>> rdd5.distinct().collect()
['a',2,'b',7]
>>> rdd.keys().collect()
['a', 'a', 'b']
初始化 Spark
聚合
原文作者
筛选
>>> rdd.collect()
[('a', 7), ('a', 2), ('b', 2)]
>>> rdd.take(2)
[('a', 7), ('a', 2)]
>>> rdd.first()
返回包含所有 RDD 元素的列表
提取前两个 RDD 元素
提取第一个 RDD 元素
提取前两个 RDD 元素
返回 rdd3 的采样子集
筛选 RDD
返回 RDD 里的唯一值
返回 RDD 键值对里的键
('a', 7)
>>> rdd.top(2)
[('b', 2), ('a', 7)]
获取
返回 RDD 的分组值
按键分组 RDD
汇总每个分区里的 RDD
元素,并输出结果
汇总每个 RDD 的键的值
汇总每个分区里的 RDD
元素,并输出结果
合并每个键的值
通过执行函数,创建
RDD 元素的元组
评论0