spark 批量入 ES

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 批量入 ES相关的知识,希望对你有一定的参考价值。

1、需要添加elasticsearch-hadoop-version.jar, version对应集群的版本,

2、代码


# encoding=UTF-8
# -*-coding:UTF-8-*-
import json
from pyspark.sql import SparkSession

# 设置doc_id
def addId(data):
    return (data['id'], json.dumps(data))


if __name__ == '__main__':
    spark = SparkSession.builder.appName('test') \\
        .enableHiveSupport().getOrCreate()


    d = {'id': '1',
         'code': '1',
         'location': {
             'lat': '32.5409',
             'lon': '123.000'
         },
         'name': '天津路',
         'cityName': '天津',

         }
    d2 = {'id': '2',
          'code': '2',
          'location': {
              'lat': '32.5409',
              'lon': '123.000'
          },
          'name': '关山路',
          'cityName': '河南',
          }

    rdd = spark.sparkContext.parallelize((d, d2))\\
        .map(addId)

    #  spark.sql("select * from tableName").map()


    es_write_conf = {
        "es.nodes": "es.node.hosts",
        "es.port": "es.node.port",
        "es.resource": 'test',
        "es.input.json": "yes",
        "es.mapping.id": "id"
    }




    rdd.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_write_conf)

3.、运行

spark-submit --jars elasticsearch-hadoop-7.9.3.jar rdd2es.py 

关注我的个人公众号【宝哥大数据】,更多干货

在这里插入图片描述

以上是关于spark 批量入 ES的主要内容,如果未能解决你的问题,请参考以下文章

记一次调优过程—Spark读取OBS文件入ES

spark->es快速导入数据

在这个 spark 代码片段中 ordering.by 是啥意思?

ES7-Es8 js代码片段

python+spark程序代码片段

关于Redis批量写入的介绍