Spark SQL怎么创建编程创建DataFrame

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL怎么创建编程创建DataFrame相关的知识,希望对你有一定的参考价值。

参考技术A Spark SQL怎么创建编程创建DataFrame
Spark SQL 中所有相关功能的入口点是 SQLContext 类或者它的子类, 创建一个 SQLContext 的所有需要仅仅是一个 SparkContext。

Spark sql怎么使用Kafka Avro序列化器

创建Kafka生产者实例:您需要使用org.apache.kafka.clients.producer.KafkaProducer类创建一个Kafka生产者实例。在创建生产者时需要指定使用Kafka Avro序列化器。

import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.kafka.clients.producer.KafkaProducer

import io.confluent.kafka.serializers.KafkaAvroSerializer

import java.util.Properties

 

val props = new Properties()

props.put("bootstrap.servers", "localhost:9092")

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")

props.put("schema.registry.url", "http://localhost:8081")

 

val producer = new KafkaProducer[String, GenericRecord](props)

 

准备要序列化的数据:您需要将要序列化的数据准备好。例如,您可以使用Spark SQL创建DataFrame:

 

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.types._

import org.apache.spark.sql.functions._

 

val spark = SparkSession.builder().appName("KafkaAvroSerializer").master("local[*]").getOrCreate()

 

val schema = StructType(Seq(

  StructField("name", StringType, true),

  StructField("age", IntegerType, true)

))

 

val data = Seq(

  ("Alice", 30),

  ("Bob", 40),

  ("Charlie", 50)

)

 

val df = spark.createDataFrame(data).toDF(schema)

序列化数据并将其发送到Kafka:您可以使用KafkaAvroSerializer序列化数据并将其发送到Kafka主题中。在下面的代码中,我们使用foreach函数遍历DataFrame中的每一行,并将其序列化为Avro格式后发送到Kafka主题中。

 

val topic = "my_topic"

 

df.foreach(row =>

  val key = row.getAs[String]("name")

  val value = new GenericData.Record(valueSchema)

  value.put("name", row.getAs[String]("name"))

  value.put("age", row.getAs[Int]("age"))

  

  val record = new ProducerRecord[String, GenericRecord](topic, key, value)

  producer.send(record)

)

 

 

以上是关于Spark SQL怎么创建编程创建DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL - 隐式创建模式和以编程方式创建模式之间的确切区别

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

1.Spark SQL基础—Spark SQL概述Spark SQL核心编程—DataFrameDataSet

14.3 Spark-SQL基于PostgreSQL数据分析编程实例

14.4 Spark-SQL基于Cassandra数据分析编程实例

Spark sql怎么使用Kafka Avro序列化器