使用Aerospike的 Spark 连接器(Spark Connect)
Posted AEROSPIKE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Aerospike的 Spark 连接器(Spark Connect)相关的知识,希望对你有一定的参考价值。
前言
Aerospike是一个高度可扩展的键值数据库,可提供同类产品中最佳的性能。它在实时业务环境中通常部署管理TB到PB数据量。
Aerospike通常与其他可扩展的分布式软件(例如,用于系统耦合的Kafka或用于分析的Spark)一起运行。Aerospike提供的 Aerospike Connect 附件使集成变得很容易。
本文通过使用 aerospike-ansible 讨论了Aerospike Spark Connect在实际中的工作方式,并提供一个全面且易于复制的端到端示例。
一、数据库集群设置
首先看一下Ansible for Aerospike,它解释了如何使用 aerospike-ansible。
在此示例中,我在vars/cluster-config.yml
中将 cluster_instance_type
设置为 c5d.18xlarge。
按照说明进行操作,直到并包括一键式设置,最后我们会运行到
ansible-playbook aws-setup-plus-aerospike-install.yml
ansible-playbook aerospike-java-client-setup.yml
1
2
这会产生一个3个节点的群集,以及一个安装了相关软件的客户端实例。
Spark 集群设置
这是通过
ansible-playbook spark-cluster-setup.yml
1
对于此示例,在运行之前,我在 vars / cluster-config.yml
中将 spark_instance_type
设置为 c5d.4xlarge。
该腳本创建了一个3节点的给定实例类型的Spark集群,在其中已安装并运行了Spark,它还安装了Aerospike Spark Connect。
请注意,您需要设置 enterprise:true
,并通过 vars/cluster-config.yml
中的 feature_key:/your/path/to/key
提供有效的Aerospike功能密钥的路径。因此,您必须是Aerospike的授权客户,或者必须正在Aerospike企业版试用期。
在过程即将结束时,您会看到
TASK [Spark master IP & master internal url] ************************************************************************************************************************************************************************
ok: [localhost] => {
"msg": "Spark master is 3.88.237.103. Spark master internal url is spark://10.0.2.122:7077."
}
1
2
3
4
记下Spark master内部网址,稍后需要。
加载数据
我们的示例利用了来自10亿纽约出租车司机库的2000万条记录,这些记录以压缩形式提供,网址为 https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz。我们使用安装在上面设置的客户端计算机上的Aerospike loader加载程序把数据加载到Aerospike。
source ./scripts/cluster-ip-address-list.sh
1
样品输出
Adds cluster ips to this array- AERO_CLUSTER_IPS
Use as ${ AERO_CLUSTER_IPS[index]}
There are 3 entries
##########################################################
cluster IP addresses : Public : 3.87.14.39, Private : 10.0.2.58
cluster IP addresses : Public : 3.89.113.231, Private : 10.0.0.234
cluster IP addresses : Public : 23.20.193.64, Private : 10.0.1.95
1
2
3
4
5
6
7
8
9
aerospike loader 加载器需要一个配置文件才能将数据加载到Aerospike中。这会将 csv 列位置映射到命名和类型化的bin。样本条目看起来像
{
"name": "pkup_datetime",
"value": {
"column_position": 3,
"type": "timestamp",
"encoding": "yyyy-MM-dd hh:mm:ss",
"dst_type": "integer"
}
}
1
2
3
4
5
6
7
8
9
在 repos/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json
的仓库中提供了此功能。我们将此上传到客户端实例。
source ./scripts/client-ip-address-list.sh
scp -i .aws.pem ./recipes/aerospike-spark-demo/nyc-taxi-data-aero-loader-config.json ec2-user@${AERO_CLIENT_IPS[0]}:~
1
2
接下来,将数据放入客户端计算机。有多种方法可以执行此操作,但是您需要进行规划,因为未压缩时数据集为7.6GB。我使用了以下命令,但是具体情况取决于您的闪存和文件系统的具体情况
./scripts/client-quick-ssh.sh # to log in, followed by
sudo mkfs.ext4 /dev/nvme1n1
sudo mkdir /data
sudo mount -t ext4 /dev/nvme1n1 /data
sudo chmod 777 /data
wget -P /data https://aerospike-ken-tune.s3.amazonaws.com/nyc-taxi-data/trips_xaa.csv.gz
gunzip /data/trips_xaa.csv.gz
1
2
3
4
5
6
7
8
9
最后,我们使用上传的配置文件加载数据。
cd ~/aerospike-loader
./run_loader -h 10.0.0.234 -p 3000 -n test -c ~/nyc-taxi-data-aero-loader-config.json /data/trips_xaa.csv
1
2
使用Spark
登录到Spark的一个节点,通过aerospike-ansible中的一个工具脚本
./scripts/spark-quick-ssh.sh
1
使用我们在运行Spark集群安装手册时看到的Spark主URL启动Spark Shell。
/spark/bin/spark-shell --master spark://10.0.2.122:7077
1
导入相关库
import org.apache.spark.sql.{ SQLContext, SparkSession, SaveMode}
import org.apache.spark.SparkConf
import java.util.Date
import java.text.SimpleDateFormat
1
2
3
4
提供Aerospike配置。请注意,我们在这里使用了之前的集群ip:
spark.conf.set("aerospike.seedhost", "10.0.0.234")
spark.conf.set("aerospike.namespace", "test")
1
2
定义一个视图,以及我们将要使用的功能
val sqlContext = spark.sqlContext
sqlContext.udf.register("getYearFromSeconds", (seconds: Long) => (new SimpleDateFormat("yyyy")).format(1000 * seconds))
val taxi = sqlContext.read.format("com.aerospike.spark.sql").option("aerospike.set", "nyc-taxi-data").load
taxi.createOrReplaceTempView("taxi")
1
2
3
4
最后,运行我们的查询语句
// Journeys grouped by cab type
val result = sqlContext.sql("SELECT cab_type,count(*) count FROM taxi GROUP BY cab_type")
result.show()
+--------+--------+
|cab_type| count|
+--------+--------+
| green|20000000|
+--------+--------+
// Average fare based on different passenger count
val result = sqlContext.sql("SELECT passenger_cnt, round(avg(total_amount),2) avg_amount FROM taxi GROUP BY passenger_cnt ORDER BY passenger_cnt")
result.show()
+-------------+----------+
|passenger_cnt|avg_amount|
+-------------+----------+
| 0| 10.86|
| 1| 14.63|
| 2| 15.75|
| 3| 15.87|
| 4| 15.85|
| 5| 14.76|
| 6| 15.42|
| 7| 23.74|
| 8| 19.52|
| 9| 34.9|
+-------------+----------+
// No of journeys for different numbers of passengers
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year, count(*) count FROM taxi GROUP BY passenger_cnt, getYearFromSeconds(pkup_datetime) order by passenger_cnt");
result.show()
+-------------+---------+--------+
|passenger_cnt|trip_year| count|
+-------------+---------+--------+
| 0| 2014| 4106|
| 1| 2014|16557518|
| 2| 2014| 1473578|
| 3| 2014| 507862|
| 4| 2014| 160714|
| 5| 2014| 939276|
| 6| 2014| 355846|
| 7| 2014| 492|
| 8| 2014| 494|
| 9| 2014| 114|
+-------------+---------+--------+
// Number of trips for each passenger count/distance combination
// Ordered by trip count, descending
val result = sqlContext.sql("SELECT passenger_cnt,getYearFromSeconds(pkup_datetime) trip_year,round(trip_distance) distance,count(*) trips FROM taxi GROUP BY passenger_cnt,getYearFromSeconds(pkup_datetime),round(trip_distance) ORDER BY trip_year,trips desc")
result.show()
+-------------+---------+--------+-------+
|passenger_cnt|trip_year|distance| trips|
+-------------+---------+--------+-------+
| 1| 2014| 1.0|5321230|
| 1| 2014| 2.0|3500458|
| 1| 2014| 3.0|2166462|
| 1| 2014| 4.0|1418494|
| 1| 2014| 5.0| 918460|
| 1| 2014| 0.0| 868210|
| 1| 2014| 6.0| 653646|
| 1| 2014| 7.0| 488416|
| 2| 2014| 1.0| 433746|
| 1| 2014| 8.0| 345728|
| 2| 2014| 2.0| 305578|
| 5| 2014| 1.0| 302120|
| 1| 2014| 9.0| 226278|
| 5| 2014| 2.0| 199968|
| 2| 2014| 3.0| 199522|
| 1| 2014| 10.0| 163928|
| 3| 2014| 1.0| 145580|
| 2| 2014| 4.0| 137152|
| 5| 2014| 3.0| 122714|
| 1| 2014| 11.0| 117570|
+-------------+---------+--------+-------+
only showing top 20 rows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
结论
这篇文章向您展示了可以很快的启动并运行一个大型数据集。该示例处理了二千万行数据,并很容易扩展到整个数据集。我们还可以看到您可以快速启动并运行 Aerospike-ansible
工具。
以上是关于使用Aerospike的 Spark 连接器(Spark Connect)的主要内容,如果未能解决你的问题,请参考以下文章
无法安装 aerospike,在“node-gyp 重建”步骤中使 aerospike 失败
AeroSpike踩坑手记1:Architecture of a Real Time Operational DBMS论文导读