spark编写简单程序 最近在学习pyspark,有入门指南吗?

[更新]
·
·
分类:互联网
2505 阅读

spark编写简单程序

最近在学习pyspark,有入门指南吗?

最近在学习pyspark,有入门指南吗?

Spark提供了一个Python_Shell,即pyspark,从而可以以交互的方式使用Python编写Spark程序。
有关Spark的基本架构介绍参考;
有关Pyspark的环境配置参考。
pyspark里最核心的模块是SparkContext(简称sc),最重要的数据载体是RDD。RDD就像一个NumPy array或者一个Pandas Series,可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。
引入Python中pyspark工作模块
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
#任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。Spark shell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。
#getOrCreate表明可以视情况新建session或利用已有的session
1
2
3
4
5
6
7
SparkSession是Spark 2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。 在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。
初始化RDD的方法
(1)本地内存中已经有一份序列数据(比如python的list),可以通过去初始化一个RDD。当执行这个操作以后,list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
#(a)利用list创建一个RDD使用可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame转成Spark RDD。
rdd ([1,2,3,4,5])
rdd
#Output:ParallelCollectionRDD[0] at parallelize at
#(b)getNumPartitions()方法查看list被分成了几部分
()
#Output:4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#(c)glom().collect()查看分区状况
().collect()
#Output:[[1], [2], [3], [4, 5]]
1
2
3
在这个例子中,是一个4-core的CPU笔记本Spark创建了4个executor,然后把数据分成4个块。colloect()方法很危险,数据量上BT文件读入会爆掉内存……
(2)创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item,不过需要注意的一点是,Spark一般默认给定的路径是指向HDFS的,如果要从本地读取文件的话,给一个file://开头(windows下是以file:开头)的全局路径。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
#(a)记录当前pyspark工作环境位置
import os
()
cwd
#Output:C:UsersYu0JulyLearn5weekhadoopspark
#(b)要读入的文件的全路径
rddsc.textFile(file: cwd
amesyob1880.txt)
rdd
#Output:file:C:UsersYu0JulyLearn5weekhadoopspark
amesyob1880.txt MapPartitionsRDD[3] at textFile at
#(c)first()方法取读入的rdd数据第一个item
()
#Output:Mary,F,7065
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
甚至可以sc.wholeTextFiles读入整个文件夹的所有文件。但是要特别注意,这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组。读入整个文件夹的所有文件。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
#记录当前pyspark工作环境位置
import os
()
cwd
#Output:C:UsersYu0JulyLearn5weekhadoopspark
rdd sc.wholeTextFiles(file: cwd
amesyob1880.txt)
rdd
#@12bcc15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
()
Output:
(file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt,
1
2
3
4
5
其余初始化RDD的方法,包括:HDFS上的文件,Hive中的数据库与表,Spark SQL得到的结果。这里暂时不做介绍。
RDD Transformation
(1)RDDs可以进行一系列的变换得到新的RDD,有点类似列表推导式的操作,先给出一些RDD上最常用到的transformation:
map() 对RDD的每一个item都执行同一个操作
flatMap() 对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list
filter() 筛选出来满足条件的item
distinct() 对RDD中的item去重
sample() 从RDD中的item中采样一部分出来,有放回或者无放回
sortBy() 对RDD中的item进行排序
1
2
3
4
5
6
如果想看操作后的结果,可以用一个叫做collect()的action把所有的item转成一个Python list。数据量大时,collect()很危险……
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
numbersRDD (range(1,10 1))
print(())
#map()对RDD的每一个item都执行同一个操作
squaresRDD (lambda x: x**2) # Square every number
print(())
#filter()筛选出来满足条件的item
filteredRDD (lambda x: x % 2 0) # Only the evens
print(())
#Output:
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
#[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
#[2, 4, 6, 8, 10]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
#flatMap() 对RDD中的item执行同一个操作以后得到一个list,然后以平铺的方式把这些list里所有的结果组成新的list
([Hello world,My name is Patrick])
wordsRDDsentencesRDD.flatMap(lambda sentence: sentence.split( ))
print(())
print(())
#Output:
#[Hello, world, My, name, is, Patrick]
#6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
对比一下:
这里如果使用map的结果是[[‘Hello’, ‘world’], [‘My’, ‘name’, ‘is’, ‘Patrick’]],
使用flatmap的结果是全部展开[‘Hello’, ‘world’, ‘My’, ‘name’, ‘is’, ‘Patrick’]。
flatmap即对应Python里的如下操作:
l [Hello world, My name is Patrick]
ll []
for sentence in l:
ll ll sentence.split( ) # 号作用,two list拼接
ll
1
2
3
4
5
(2)最开始列出的各个Transformation,可以一个接一个地串联使用,比如:
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
def doubleIfOdd(x):
if x % 2 1:
return 2 * x
else:
return x
numbersRDD (range(1,10 1))
resultRDD (numbersRDD
.map(doubleIfOdd) #map,filter,distinct()
.filter(lambda x: x gt 6)
.distinct()) #distinct()对RDD中的item去重
()
#Output:[8, 10, 18, 14]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(3)当遇到更复杂的结构,比如被称作“pair RDDs”的以元组形式组织的k-v对(key, value),Spark中针对这种item结构的数据,定义了一些transform和action:
reduceByKey(): 对所有有着相同key的items执行reduce操作
groupByKey(): 返回类似(key, listOfValues)元组的RDD,后面的value List 是同一个key下面的
sortByKey(): 按照key排序
countByKey(): 按照key去对item个数进行统计
collectAsMap(): 和collect有些类似,但是返回的是k-v的字典
1
2
3
4
5
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
confSparkConf().setAppName(miniProject).setMaster(local[*])
(conf)
([Hello hello

运行Spark程序有哪几种模式?

Spark可以和Yarn整合,将Application提交到Yarn上运行,和StandAlone提交模式一样,Yarn也有两种提交任务的方式。
1、yarn-client提交任务方式
配置 在client节点配置中添加Hadoop_HOME的配置目录即可提交yarn 任务,具体步骤如下:
注意client只需要有Spark的安装包即可提交任务,不需要其他配置(比如slaves)!!!
提交命令 ./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-lient --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
执行原理图解
执行流程
客户端提交一个Application,在客户端启动一个Driver进程。Driver进程会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。AM启动后,会向RS请求一批container资源,用于启动会找到一批NM返回给AM,用于启动Executor。AM会向NM发送命令启动Executor。Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。总结1、Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.
2、 ApplicationMaster的作用:
为当前的Application申请资源
给NodeManager发送消息启动Executor。
注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。
2、yarn-cluster提交任务方式
提交命令 ./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
结果在yarn的日志里面:
执行原理
执行流程客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。AM启动,AM发送请求到RS,请求一批container用于启动Executor。RS返回一批NM节点给AM。AM连接到NM,发送请求到NM启动Executor。Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。总结1、Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
的作用:
为当前的Application申请资源
给nodemanager发送消息 启动Excutor。
任务调度。(这里和client模式的区别是AM具有调度能力,因为其就是Driver端,包含Driver进程)
3、 停止集群任务命令:yarn application -kill applicationID
自我最后总结:stand-alone模式中Master发送对应的命令启动Worker上的executor进程,而yarn模式中的applimaster也是负责启动worker中的Driver进程,可见都是master负责发送消息,然后再对应的节点上启动executor进程。菲官方证实,仅供理解!!!