查看: 442|回复: 1

pyspark的使用和操作

[复制链接]
新浪微博达人勋 xenron  实名认证
论坛徽章:
227
matlab徽章
日期:2019-07-11 14:23:31股票徽章
日期:2019-07-11 14:24:09智慧眼徽章
日期:2019-07-11 14:24:43股票徽章
日期:2019-07-11 14:24:48
发表于 2018-12-30 20:44 | 显示全部楼层 |阅读模式

Python SQL Spark 集群 Hive

pyspark里最核心的模块是SparkContext(简称sc),最重要的数据载体是RDD。RDD就像一个NumPy array或者一个Pandas Series,可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。

引入Python中pyspark工作模块
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject".setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)
#任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数(比如主节点的URL)。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。Spark shell会自动初始化一个SparkContext(在Scala和Python下可以,但不支持Java)。
#getOrCreate表明可以视情况新建session或利用已有的session

SparkSession是Spark 2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。 在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于SQL,使用sqlContext;对于hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。

初始化RDD的方法
(1)本地内存中已经有一份序列数据(比如python的list),可以通过sc.parallelize去初始化一个RDD。当执行这个操作以后,list中的元素将被自动分块(partitioned),并且把每一块送到集群上的不同机器上。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)

#(a)利用list创建一个RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame转成Spark RDD。
rdd = sc.parallelize([1,2,3,4,5])
rdd
#Output:ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

#(b)getNumPartitions()方法查看list被分成了几部分
rdd.getNumPartitions()
#Output:4

#(c)glom().collect()查看分区状况
rdd.glom().collect()
#Output:[[1], [2], [3], [4, 5]]

在这个例子中,是一个4-core的CPU笔记本;Spark创建了4个executor,然后把数据分成4个块。colloect()方法很危险,数据量上BT文件读入会爆掉内存……

(2)创建RDD的另一个方法是直接把文本读到RDD。文本的每一行都会被当做一个item,不过需要注意的一点是,Spark一般默认给定的路径是指向HDFS的,如果要从本地读取文件的话,给一个file://开头(windows下是以file:\\开头)的全局路径。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)

#(a)记录当前pyspark工作环境位置
import os
cwd=os.getcwd()  
cwd
#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekHadoopspark'

#(b)要读入的文件的全路径
rdd=sc.textFile("file:\\\\\\" + cwd + "\\names\yob1880.txt")
rdd
#Output:file:\\\C:\Users\Yu\0JulyLearn\5weekhadoopspark\names\yob1880.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

#(c)first()方法取读入的rdd数据第一个item
rdd.first()
#Output:'Mary,F,7065'

甚至可以sc.wholeTextFiles读入整个文件夹的所有文件。但是要特别注意,这种读法,RDD中的每个item实际上是一个形如(文件名,文件所有内容)的元组。读入整个文件夹的所有文件。

import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
sc=SparkContext.getOrCreate(conf)

#记录当前pyspark工作环境位置
import os
cwd=os.getcwd()  
cwd
#Output:'C:\\Users\\Yu\\0JulyLearn\\5weekhadoopspark'

rdd = sc.wholeTextFiles("file:\\\\\\" + cwd + "\\names\yob1880.txt")
rdd
#Outputrg.apache.spark.api.java.JavaPairRDD@12bcc15

rdd.first()

Output:
('file:/C:/Users/Yu/0JulyLearn/5weekhadoopspark/names/yob1880.txt',
'Mary,F,7065\r\nAnna,F,2604\r\nEmma,F,2003\r\nElizabeth,F,1939\r\nMinnie,F,1746')

其余初始化RDD的方法,包括:HDFS上的文件,Hive中的数据库与表,Spark SQL得到的结果。


回复

使用道具 举报

论坛徽章:
6
spark徽章
日期:2019-03-28 14:58:37python徽章
日期:2019-04-11 15:16:08python徽章
日期:2019-04-19 10:51:53python徽章
日期:2019-04-19 10:52:46python徽章
日期:2019-04-19 10:53:27Hadoop研习者初级
日期:2019-05-16 15:54:51
发表于 2018-12-30 22:31 | 显示全部楼层
由于笔者目前用python比较多,所以想安装下pySpark,并且在Anaconda2中调用。
(1)jdk-8u91-windows-x64.exe
(2)spark-1.6.0-bin-hadoop2.6.0.tgz

安装
(1)jdk默认安装
(2)spark-1.6.0-bin-hadoop2.6.0.tgz先进行解压。假设目录为E:\spark-1.6.0-bin-hadoop2.6.0
(3)配置环境变量

SPARK_HOME=E:\spark-1.6.0-bin-hadoop2.6.0

Path添加%SPARK_HOME%\bin和%SPARK_HOME%\python

这时,你可以利用打开cmd,输入pySpark。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册 新浪微博登陆

本版积分规则

 

GMT+8, 2019-7-16 12:28 , Processed in 0.116559 second(s), 31 queries .

关闭

扫一扫加入
本版微信群