spark 批量入 ES
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 批量入 ES相关的知识,希望对你有一定的参考价值。
1、需要添加elasticsearch-hadoop-version.jar
, version对应集群的版本,
2、代码
# encoding=UTF-8
# -*-coding:UTF-8-*-
import json
from pyspark.sql import SparkSession
# 设置doc_id
def addId(data):
return (data['id'], json.dumps(data))
if __name__ == '__main__':
spark = SparkSession.builder.appName('test') \\
.enableHiveSupport().getOrCreate()
d = {'id': '1',
'code': '1',
'location': {
'lat': '32.5409',
'lon': '123.000'
},
'name': '天津路',
'cityName': '天津',
}
d2 = {'id': '2',
'code': '2',
'location': {
'lat': '32.5409',
'lon': '123.000'
},
'name': '关山路',
'cityName': '河南',
}
rdd = spark.sparkContext.parallelize((d, d2))\\
.map(addId)
# spark.sql("select * from tableName").map()
es_write_conf = {
"es.nodes": "es.node.hosts",
"es.port": "es.node.port",
"es.resource": 'test',
"es.input.json": "yes",
"es.mapping.id": "id"
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
3.、运行
spark-submit --jars elasticsearch-hadoop-7.9.3.jar rdd2es.py
关注我的个人公众号【宝哥大数据】,更多干货
以上是关于spark 批量入 ES的主要内容,如果未能解决你的问题,请参考以下文章