地铁译:Spark for python developers ---Spark的数据戏法
Posted 半吊子全栈工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了地铁译:Spark for python developers ---Spark的数据戏法相关的知识,希望对你有一定的参考价值。
聚焦在 Twitter 上关于Apache Spark的数据, 这些是准备用于机器学习和流式处理应用的数据。 重点是如何通过分布式网络交换代码和数据,获得 串行化, 持久化 , 调度和缓存的实战经验 。 认真使用 Spark SQL, 交互性探索结构化和半结构化数据. Spark SQL 的基础数据结构是 Spark dataframe, Spark dataframe 受到了 Python Pandas dataframe 和R dataframe 的启发. 这是一个强大的数据结构, 有R 或Python 背景的数据科学家非常容易理解并喜欢上它.
主要关注以下几点:
• 连接 Twitter, 收集有关数据, 然后存储到各种格式中如 JSON ,CSV 以及 MongoDB这样的数据存储
• 使用Blaze and Odo分析数据, 一个Blaze的副产品库, 能够在各种源和目标之间建立连接并传输数据
• 引入 Spark dataframes 作为各个 Spark 模块交换数据的基础,同时使用 Spark SQL交互性探索数据
回顾数据密集型应用的架构
首先审视数据密集型应用架构,集中关注集成层以及获取、提炼和数据持久化迭代循环的基本运行. 这一循环命名为 5C. 5C 代表了connect, collect,correct, compose和consume. 这是集成层运行的基本过程以便于保证从Twitter 获取数据的质量和数量. 我们也将深入持久化层,建立如 MongoDB这样的数据存储方便后面数据的处理.
通过Blaze探索数据, 这是数据操控的一个Python 库, 通过Spark SQL使用 Spark dataframe, 完成交互性数据发现,感受一下三种 dataframe flavors 的细微差别。
下图指出了本章的重点, 集中在集成层和持久化层:
数据的序列化和反序列化
由于在通过API获取数据是的限制,我们需要数据存储. 数据在分布式集群上处理,我们需要一致的方式来保存状态以便于将来的提取使用。现在定义序列化, 持久化, 调度和缓存.
序列化一个Python对象是将它转换一个字节流. 该Python 对象在程序挂掉的时候能够通过反序列化提取.序列化后的 Python 对象在网络上传输或者存在持久化存储中. 反序列化是其逆运算将字节流转化为初始的 Python 对象所以程序能够从保存的状态中提取。 Python中最流行的序列化库是Pickle. 事实上,PySpark命令将pickled 的数据传输到多个工作节点.
持久化 将程序的状态数据保存到硬盘或内存,因而在离开或重启时继续使用。把一个Python 对象从内存保存到文件或数据库,在以后加载的时候拥有相同的状态。
调度 是在多核或者分布式系统中在网络TCP连接上发送 Python 代码和数据.
缓存 是将一个Python 对象转化为内存中的字符串可以作为字典中的关键字. Spark 支持将数据放入集群范围的内存缓冲区. 这在数据重复访问时非常有用,例如查询一个小引用的数据集或者象 Google PageRank那样的迭代算法.
缓存是Spark中非常关键的一个概念,允许我们将RDDs 存入内存或者溢出到硬盘 . 缓存策略的选择依赖于数据的线性程度或者RDD转换的DAG ,这样可以最小化 shuffle 或跨网络数据交换.Spark 为了获得更好的性能,需要注意数据shuffling. 一个好的分区策略和 RDD 缓存, 避免不必要的操作, 可以导致Spark更好的性能.
获取和存储数据
在深入如MongoDB这样的数据库存储之前,先看一下广泛使用的文件存储 : CSV和JSON文件存储. 这两种格式被广泛使用的主要原因: 可读性好, 简单, 轻度关联, 并容易使用.
在CSV中持久化数据
CSV 是轻量级可读易用的格式. 拥有已分隔的文本列和内在表格制式。Python提供了强健的csv库能将 cvs文件序列化为一个Python的字典. 为了我们的程序方便, 写了一个 python类来管理CSV格式中 数据的存储,和从CSV中读取数据. 看一下 IO_csv 类的代码. init 部分 实例化了文件路径,文件名和文件后缀(本例中, .csv):
class IO_csv(object):
def __init__(self, filepath, filename, filesuffix='csv'):
self.filepath = filepath # /path/to/file without the /'
at the end
self.filename = filename # FILE_NAME
self.filesuffix = filesuffix
该类的存储方法使用了tuple 和 csv 文件的头字段作为scheme来持久化数据。如果csv文件存在,则追加数据,否则创建:
def save(self, data, NTname, fields):
# NTname = Name of the NamedTuple
# fields = header of CSV - list of the fields name
NTuple = namedtuple(NTname, fields)
if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix)):
# Append existing file
with open('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix), 'ab') as f:
writer = csv.writer(f)
# writer.writerow(fields) # fields = header of CSV
writer.writerows([row for row in map(NTuple._make,
data)])
# list comprehension using map on the NamedTuple._
make() iterable and the data file to be saved
# Notice writer.writerows and not writer.writerow
(i.e. list of multiple rows sent to csv file
else:
# Create new file
with open('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix), 'wb') as f:
writer = csv.writer(f)
writer.writerow(fields) # fields = header of CSV -
list of the fields name
writer.writerows([row for row in map(NTuple._make,
data)])
# list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
# Notice writer.writerows and not writer.writerow
(i.e. list of multiple rows sent to csv file
该类的加载方法使用了tuple 和 csv 文件的头字段使用一致的schema来提取数据。 加载方法使用生成器来提高内存的有效性,使用yield 返回:
def load(self, NTname, fields):
# NTname = Name of the NamedTuple
# fields = header of CSV - list of the fields name
NTuple = namedtuple(NTname, fields)
with open('{0}/{1}.{2}'.format(self.filepath,self.filename,self.filesuffix),'rU') as f:
reader = csv.reader(f)
for row in map(NTuple._make, reader):
# Using map on the NamedTuple._make() iterable and the reader file to be loaded
yield row
我们使用tuple解析tweet保存到csv或者从csv中提取数据:
fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text','url']
Tweet01 = namedtuple('Tweet01',fields01)
def parse_tweet(data):
"""
Parse a ``tweet`` from the given response data.
"""
return Tweet01(
id=data.get('id', None),
created_at=data.get('created_at', None),
user_id=data.get('user_id', None),
user_name=data.get('user_name', None),
tweet_text=data.get('tweet_text', None),
url=data.get('url')
)
在 JSON中持久化
JSON 是互联网应用中使用最广泛的数据格式之一. 所有我们使用的API,Twitter, GitHub, 和Meetup, 都通过JSON格式发送数据. JSON 格式比 XML格式要轻,可读性好,在JSON 中内嵌模式. 对于CSV 格式, 所有记录遵从相同的表结构,而JSON 的结构能够变化,是半结构化的,一条JSON 记录能够映射成Python中的字典。 看一下 IO_json类的代码. init 部分例化了文件路径,文件名和文件后缀(本例中,.json):
class IO_json(object):
def __init__(self, filepath, filename, filesuffix='json'):
self.filepath = filepath # /path/to/file without the /'
at the end
self.filename = filename # FILE_NAME
self.filesuffix = filesuffix
# self.file_io = os.path.join(dir_name, .'.join((base_
filename, filename_suffix)))
该类的save方法使用utf-8编码来保证数据读写的兼容性。 如果JSON存在, 则追加数据否则创建:
def save(self, data):
if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.
filename, self.filesuffix)):
# Append existing file
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f:
f.write(unicode(json.dumps(data, ensure_ascii=
False))) # In python 3, there is no "unicode" function
# f.write(json.dumps(data, ensure_ascii= False)) #
create a \\" escape char for " in the saved file
else:
# Create new file
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f:
f.write(unicode(json.dumps(data, ensure_ascii=
False)))
# f.write(json.dumps(data, ensure_ascii= False))
这个类的load 方法返回了读取的文件 , 获取json数据需要调用 json.loads函数:
def load(self):
with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f:
return f.read()
搭建MongoDB
鉴于存储所收集信息的重要性,搭建MongoDB 作为我们的文档存储数据库 . 所有采集的信息是 JSON 格式, MongoDB 以 BSON (short for Binary JSON)格式信息, 因此是一个自然的选择.
现在完成下列步骤:
• 安装MongoDB 服务器和客户端
• 运行MongoDB server
• 运行 Mongo client
• 安装PyMongo driver
• 创建 Python Mongo client
安装MongoDB服务器和客户端
执行如下步骤安装 MongoDB 包:
1. 使用包管理工具导入公钥(in our
case, Ubuntu’s apt),命令如下:
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv
7F0CEB10
- 创建 MongoDB 的文件列表,命令如下. :
echo "deb http://repo.mongodb.org/apt/ubuntu "$("lsb_release
-sc)"/ mongodb-org/3.0 multiverse" | sudo tee /etc/apt/sources.
list.d/mongodb-org-3.0.list
3.升级本地包的数据库:
sudo apt-get update
4.安装MongoDB 的最新稳定版:
sudo apt-get install -y mongodb-org
运行MongoDB服务器
启动MongoDB server:
1. 启动MongoDB server, 命令如下:
sudo service mongodb start
- 检查mongod 是否正常启动:
an@an-VB:/usr/bin$ ps -ef | grep mongo
mongodb
967 1 4 07:03 ? 00:02:02 /usr/bin/mongod
--config /etc/mongod.conf
an
3143 3085 0 07:45 pts/3 00:00:00 grep --color=auto
mongo
In
在本例中,mongodb 运行在967进程.
3. The mongod server 监听默认端口27017 可以在配置文件中修改.
4. 检查/var/log/mongod/mongod.log 日志文件的内容:
an@an-VB:/var/lib/mongodb$ ls -lru
total 81936
drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 _tmp
-rw-r--r-- 1 mongodb nogroup 69 Apr 25 11:19 storage.bson
-rwxr-xr-x 1 mongodb nogroup 5 Apr 25 11:19 mongod.lock
-rw------- 1 mongodb nogroup 16777216 Apr 25 11:19 local.ns
-rw------- 1 mongodb nogroup 67108864 Apr 25 11:19 local.0
drwxr-xr-x 2 mongodb nogroup 4096 Apr 25 11:19 journal
5. 停止mongodb 的服务器, 命令如下:
sudo service mongodb stop
运行Mongo客户端
在控制台运行Mongo client 很简单,命令如下:
an@an-VB:/usr/bin$ mongo
MongoDB shell version: 3.0.2
connecting to: test
Server has startup warnings:
2015-05-30T07:03:49.387+0200 I CONTROL [initandlisten]
2015-05-30T07:03:49.388+0200 I CONTROL [initandlisten]
在mongo client console 提示下, 查看数据库的命令如下:
> show dbs
local 0.078GB
test 0.078GB
选择 test数据库:
> use test
switched to db test
在数据库中显示 collections:
> show collections
restaurants
system.indexes
我们查看 restaurant collection 中的纪录:
> db.restaurants.find()
{ "_id" : ObjectId("553b70055e82e7b824ae0e6f"), "address : { "building
: "1007", "coord" : [ -73.856077, 40.848447 ], "street : "Morris Park
Ave", "zipcode : "10462 }, "borough : "Bronx", "cuisine : "Bakery",
"grades : [ { "grade : "A", "score" : 2, "date" : ISODate("2014-
03-03T00:00:00Z") }, { "date" : ISODate("2013-09-11T00:00:00Z"),
"grade : "A", "score" : 6 }, { "score" : 10, "date" : ISODate("2013-
01-24T00:00:00Z"), "grade : "A }, { "date" : ISODate("2011-11-
23T00:00:00Z"), "grade : "A", "score" : 9 }, { "date" : ISODate("2011-
03-10T00:00:00Z"), "grade : "B", "score" : 14 } ], "name : "Morris
Park Bake Shop", "restaurant_id : "30075445" }
安装PyMongo driver
在anaconda 中安装mongodb的Python驱动也很简单:
conda install pymongo
创建 MongoDB的Python client
我们创建一个 IO_mongo 类用来收集数据 存储采集的数据 提取保存的数据. 为了创建mongo client, 需要import pymongo. 连接本地端口 27017命令如下:
from pymongo import MongoClient as MCli
class IO_mongo(object):
conn={'host':'localhost', 'ip':'27017'}
我们的类初始化了客户端连接, 数据库 (本例中, twtr_db),和被访问连接的collection (本例中, twtr_coll):
def __init__(self, db='twtr_db', coll='twtr_coll', **conn ):
# Connects to the MongoDB server
self.client = MCli(**conn)
self.db = self.client[db]
self.coll = self.db[coll]
save方法插入新的纪录:
def save(self, data):
#Insert to collection in db
return self.coll.insert(data)
load 方法根据规则提取数据. 在数据量大的情况下 返回游标:
def load(self, return_cursor=False, criteria=None, projection=None):
if criteria is None:
criteria = {}
if projection is None:
cursor = self.coll.find(criteria)
else:
cursor = self.coll.find(criteria, projection)
# Return a cursor for large amounts of data
if return_cursor:
return cursor
else:
return [ item for item in cursor ]
从Twitter汲取数据
每个社交网络都有自己的限制和挑战, 一个主要的障碍就是强加的流量限制. 在长连接或重复执行时要有暂停, 必须要避免重复数据.我们重新设计了连接程序来关注流量限制。
TwitterAPI 类根据查询条件来搜索和采集,我们已经添加了如下操作:
•日志能力,使用 Python logging 库在程序失败时纪录错误和警告
• 使用MongoDB 的持久化能力,象使用 IO_json 操作JSON 文件那样操作 IO_mongo 类
• API 流量限制和错误管理能力 , 保证我们弹性调用 Twitter 而不会被认为是恶意攻击
步骤如下:
1. 通过证书初始化Twitter API 的实例:
class TwitterAPI(object):
"""
TwitterAPI class allows the Connection to Twitter via OAuth
once you have registered with Twitter and receive the
necessary credentials
"""
def __init__(self):
consumer_key = 'get_your_credentials'
consumer_secret = get your_credentials'
access_token = 'get_your_credentials'
access_secret = 'get your_credentials'
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_secret = access_secret
self.retries = 3
self.auth = twitter.oauth.OAuth(access_token, access_
secret, consumer_key, consumer_secret)
self.api = twitter.Twitter(auth=self.auth)
2 设置日志等级,初始化 logger:
° logger.debug(debug message)
° logger.info(info message)
° logger.warn(warn message)
° logger.error(error message)
° logger.critical(critical message)
3设置日志路径和内容格式:
# logger initialisation
appName = 'twt150530'
self.logger = logging.getLogger(appName)
#self.logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
logPath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/examples/AN_Spark/data'
fileName = appName
fileHandler = logging.FileHandler("{0}/{1}.log".
format(logPath, fileName))
formatter = logging.Formatter('%(asctime)s - %(name)s -
%(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
self.logger.addHandler(fileHandler)
self.logger.setLevel(logging.DEBUG)
4.初始化JSON文件的持久化指令:
# Save to JSON file initialisation
jsonFpath = '/home/an/spark/spark-1.3.0-bin-hadoop2.4/
examples/AN_Spark/data'
jsonFname = 'twtr15053001'
self.jsonSaver = IO_json(jsonFpath, jsonFname)
5.初始化 MongoDB database 和 collection :
# Save to MongoDB Intitialisation
self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_
coll')
6.searchTwitter 方法 根据指定的查询条件搜索:
def searchTwitter(self, q, max_res=10,**kwargs):
search_results = self.api.search.tweets(q=q, count=10,
**kwargs)
statuses = search_results['statuses']
max_results = min(1000, max_res)
for _ in range(10):
try:
next_results = search_results['search_metadata']
['next_results']
# self.logger.info('info' in searchTwitter - next_
results:%s'% next_results[1:])
except KeyError as e:
self.logger.error('error' in searchTwitter: %s',%(e))
break
# next_results = urlparse.parse_qsl(next_results[1:])
# python 2.7
next_results = urllib.parse.parse_qsl(next_results[1:])
# self.logger.info('info' in searchTwitter - next_
results[max_id]:', next_results[0:])
kwargs = dict(next_results)
# self.logger.info('info' in searchTwitter - next_
results[max_id]:%s'% kwargs['max_id'])
search_results = self.api.search.tweets(**kwargs)
statuses += search_results['statuses']
self.saveTweets(search_results['statuses'])
if len(statuses) > max_results:
self.logger.info('info' in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results))
break
return statuses
7.saveTweets 方法将所选的tweets 保存为JSON 存入MongoDB:
def saveTweets(self, statuses):
# Saving to JSON File
self.jsonSaver.save(statuses)
# Saving to MongoDB
for s in statuses:
self.mongoSaver.save(s)
8.parseTweets 方法从Twitter API 提供的大量信息中提取关键的 tweet 信息:
def parseTweets(self, statuses):
return [ (status['id'],
status['created_at'],
status['user']['id'],
status['user']['name']
status['text''text'],
url['expanded_url'])
for status in statuses
for url in status['entities']['urls']
]
9.getTweets 方法调用searchTwitter,保证API 调用的稳定性并重点关注速率限制。代码如下:
def getTweets(self, q, max_res=10):
"""
Make a Twitter API call whilst managing rate limit and errors.
"""
def handleError(e, wait_period=2, sleep_when_rate_
limited=True):
if wait_period > 3600: # Seconds
self.logger.error('Too many retries in getTweets:
%s', %(e))
raise e
if e.e.code == 401:
self.logger.error('error 401 * Not Authorised * in
getTweets: %s', %(e))
return None
elif e.e.code == 404:
self.logger.error('error 404 * Not Found * in
getTweets: %s', %(e))
return None
elif e.e.code == 429:
self.logger.error('error 429 * API Rate Limit
Exceeded * in getTweets: %s', %(e))
if sleep_when_rate_limited:
self.logger.error('error 429 * Retrying in 15
minutes * in getTweets: %s', %(e))
sys.stderr.flush()
time.sleep(60*15 + 5)
self.logger.info('error 429 * Retrying now *
in getTweets: %s', %(e))
return 2
else:
raise e # Caller must handle the rate limiting issue
elif e.e.code in (500, 502, 503, 504):
self.logger.info('Encountered %i Error. Retrying
in %i seconds' % (e.e.code, wait_period))
time.sleep(wait_period)
wait_period *= 1.5
return wait_period
else:
self.logger.error('Exit - aborting - %s', %(e))
raise e
10.根据指定的参数查询调用searchTwitter API . 如果遇到了任何错误, 由handleError 方法处理:
while True:
try:
self.searchTwitter( q, max_res=10)
except twitter.api.TwitterHTTPError as e:
error_count = 0
wait_period = handleError(e, wait_period)
if wait_period is None:
return
使用Blaze探索数据
Blaze是个由Continuum.io,开发的 Python库 ,利用了 Python Numpy arrays 和 Pandas dataframe. Blaze 扩展到多核计算, 而Pandas 和 Numpy 是单核的.
Blaze 为各种后端提供了统一适配的一致性用户接口. Blaze 精心安排了:
• Data: 不同数据存储的无缝交换如 CSV, JSON, HDF5,
HDFS, 和 Bcolz 文件
• Computation: 对不同的后端采用同样的查询方式如 Spark, MongoDB, Pandas, or SQL Alchemy.
• Symbolic expressions: 在一定范围内使用了与Pandas类似的语法来抽象表达 join, group-by, filter,
selection, 和注入,参考R语言实现了
split-apply-combine 方法.
Blaze 表达式 和Spark RDD 数据转换一致,采用延迟计算.
深入 Blaze首先要引入所需的库: numpy, pandas, blaze 和 odo. Odo 是 Blaze的一个派生品保证了各种数据后端的数据移植,命令如下:
import numpy as np
import pandas as pd
from blaze import Data, by, join, merge
from odo import odo
BokehJS successfully loaded.
读取存储在CSV文件中解析过的tweets 生成Pandas Dataframe:
twts_csv:
twts_pd_df = pd.DataFrame(twts_csv_read, columns=Tweet01._fields)
twts_pd_df.head()
Out[65]:
id created_at user_id user_name tweet_text url
1 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
3 98808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://www.webex.com/ciscospark/
4 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://sparkjava.com/
运行Tweets Panda Dataframe 的 describe() 函数 获得数据集中的信心:
twts_pd_df.describe()
Out[66]:
id created_at user_id user_name tweet_text url
count 19 19 19 19 19 19
unique 7 7 6 6 6 7
top 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://bit.ly/1Hfd0Xm
freq 6 6 9 9 6 6
简单的调用Data() 函数将Pandas dataframe 转化为一个 Blaze dataframe:
#
# Blaze dataframe
#
twts_bz_df = Data(twts_pd_df)
通过传递schema 函数提取一个 Blaze dataframe 的schema 表达:
twts_bz_df.schema
Out[73]:
dshape("""{
id: ?string,
created_at: ?string,
user_id: ?string,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
.dshape 函数给出一条记录和schema:
twts_bz_df.dshape
Out[74]:
dshape("""19 * {
id: ?string,
created_at: ?string,
user_id: ?string,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
打印Blaze dataframe 的内容:
twts_bz_df.data
Out[75]:
id created_at user_id user_name tweet_text url
1 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
...
18 598782970082807808 2015-05-14 09:32:39 1377652806
embeddedcomputer.nl RT @BigDataTechCon: Moving Rating
Prediction w... http://buff.ly/1QBpk8J
19 598777933730160640 2015-05-14 09:12:38 294862170 Ellen
Friedman I'm still on Euro time. If you are too check o...
http://bit.ly/1Hfd0Xm
提取 tweet_text 字段,获得唯一的值:
twts_bz_df.tweet_text.distinct()
Out[76]:
tweet_text
0 RT @pacoid: Great recap of @StrataConf EU in L...
1 RT @alvaroagea: Simply @ApacheSpark http://t.c...
2 RT @PrabhaGana: What exactly is @ApacheSpark a...
3 RT @Ellen_Friedman: I'm still on Euro time. If...
4 RT @BigDataTechCon: Moving Rating Prediction w...
5 I'm still on Euro time. If you are too check o...
从dataframe 中提取了多个字段 [‘id’, ‘user_name’,’tweet_text’] 并计算唯一的记录:
twts_bz_df[['id', 'user_name','tweet_text']].distinct()
Out[78]:
id user_name tweet_text
0 598831111406510082 raulsaeztapia RT @pacoid: Great recap of @
StrataConf EU in L...
1 598808944719593472 raulsaeztapia RT @alvaroagea: Simply @
ApacheSpark http://t.c...
2 598796205091500032 John Humphreys RT @PrabhaGana: What exactly
is @ApacheSpark a...
3 598788561127735296 Leonardo D'Ambrosi RT @Ellen_Friedman: I'm
still on Euro time. If...
4 598785545557438464 Alexey Kosenkov RT @Ellen_Friedman: I'm
still on Euro time. If...
5 598782970082807808 embeddedcomputer.nl RT @BigDataTechCon:
Moving Rating Prediction w...
6 598777933730160640 Ellen Friedman I'm still on Euro time. If
you are too check o...
使用 Odo传输数据
Odo 是Blaze的一个衍生项目. 用于数据交换,保证了各种不同格式数据间的移植 (CSV, JSON, HDFS, and more) 并且跨越不同的数据库 (SQL 数据库, MongoDB, 等等) ,用法简单,Odo(source, target) 为了 传输到一个数据库,需要指定URL地址. 例如, MongoDB , 用法如下:
mongodb://username:password@hostname:port/database_name::collection_name
使用Odo 运行一些例子,这里通过读取CSV文件并创建一个 Blaze dataframe来展示Odo的用法:
filepath = csvFpath
filename = csvFname
filesuffix = csvSuffix
twts_odo_df = Data('{0}/{1}.{2}'.format(filepath, filename,
filesuffix))
计算 dataframe中的记录个数:
twts_odo_df.count()
Out[81]:
19
显示dataframe中最初的5条记录:
twts_odo_df.head(5)
Out[82]:
id created_at user_id user_name tweet_text url
0 598831111406510082 2015-05-14 12:43:57 14755521
raulsaeztapia RT @pacoid: Great recap of @StrataConf EU in L...
http://www.mango-solutions.com/wp/2015/05/the-...
2 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c... http://www.webex.com/ciscospark/
3 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
http://sparkjava.com/
4 598808944719593472 2015-05-14 11:15:52 14755521
raulsaeztapia RT @alvaroagea: Simply @ApacheSpark http://t.c...
https://www.sparkfun.com/
从dataframe 中获得 dshape 的信息 , 这里得到记录的个数和 schema:
twts_odo_df.dshape
Out[83]:
dshape("var * {
id: int64,
created_at: ?datetime,
user_id: int64,
user_name: ?string,
tweet_text: ?string,
url: ?string
}""")
将处理过的 Blaze dataframe 存入 JSON:
odo(twts_odo_distinct_df, '{0}/{1}.{2}'.format(jsonFpath, jsonFname,
jsonSuffix))
Out[92]:
<odo.backends.json.JSONLines at 0x7f77f0abfc50>
转换JSON 文件为 CSV 文件:
odo('{0}/{1}.{2}'.format(jsonFpath, jsonFname, jsonSuffix), '{0}/{1}.{2}'.format(csvFpath, csvFname, csvSuffix))
Out[94]:
<odo.backends.csv.CSV at 0x7f77f0abfe10>
使用Spark SQL探索数据
Spark SQL 是建立在Spark 核心之上的关系型查询引擎. Spark SQL 使用的查询优化叫 Catalyst.
关系型查询使用 SQL 或HiveQL 表达,在 JSON, CSV, 和各种数据库中查询. Spark SQL 为 RDD 函数式编程之上的Spark dataframes 提供了完整的声明式表达.
理解 Spark dataframe
从 @bigdata 而来的一个tweet 意味着 Spark SQL和 dataframes都可
以上是关于地铁译:Spark for python developers ---Spark的数据戏法的主要内容,如果未能解决你的问题,请参考以下文章
地铁译:Spark for python developers ---Spark与数据的机器学习
地铁译:Spark for python developers ---构建Spark批处理和流处理应用前的数据准备