aws创建kafka(msk)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了aws创建kafka(msk)相关的知识,希望对你有一定的参考价值。

参考技术A 在创建 Amazon MSK 集群之前,您需要拥有 VPC 并在该 VPC 内设置子网。在美国西部(加利福尼亚北部)区域中,您需要两个不同可用区中的两个子网。在提供 Amazon MSK 的所有其他区域中,您可以指定两个或三个子网。您的子网必须位于不同的可用区中。创建集群时,Amazon MSK 在您指定的子网之间平均分配代理节点。有关如何为 MSK 集群设置 VPC 和子网的示例,请参阅 第 1 步:为 MSK 集群创建 VPC 和 第 2 步:启用高可用性和容错功能 .

创建 Amazon MSK 集群时,您可以指定您希望拥有的代理类型。亚马逊 MSK 支持以下经纪商类型:

M5 代理具有比 T3 代理更高的基准吞吐量性能,建议用于生产工作负载。M5 代理还可具有比 T3 代理更多的每代理分区。如果您正在运行较大的生产级工作负载或需要更多的分区,请使用 M5 代理。要了解有关 M5 实例类型的更多信息,请参阅 Amazon EC2 M5 实例 。
T3 代理可以使用 CPU 积分来临时提高性能。如果您正在测试中小型流式处理工作负载,或者您的低吞吐量流式处理工作负载会临时出现吞吐量高峰,则可以使用 T3 代理进行低成本开发。建议您运行概念验证测试来确定 T3 代理是否足以应对生产或关键工作负载。要了解有关 T3 实例类型的更多信息,请参阅 Amazon EC2 T3 实例 。
有关如何选择代理类型的更多信息,请参阅 将集群设置为正确大小 .

有关自定义 MSK 配置以及如何创建这些配置的信息,请参阅 Amazon MSK 配置 。

AWS Glue集成Kafka/MSK

简单总结一下AWS Glue如何集成Kafka/MSK吧,因为当前(Glue 2.0)Glue对Kafka的集成和使用在我看来是比较“扭曲”的,并不能像原生Spark那样以spark.readStream的方式接入Kafka,所以暂时只做一下简单的归纳,后续根据新的进展再做补充。

在Glue中接入Kafka的消息需要先创建Glue Connection,只有通过Connection,Glue才能访问到外部Kakfa/MSK集群,因为在配置Connection的过程中需要提供VPC信息,Glue会透明地给后台节点创建ENI(虚拟网卡)联通到指定的VPC上,注意这一操作是通明的,不可控的。

有了Connection之后,还得建一张数据表(就是一张普通的Hive表),并在表的属性配置(TBLPROPERTIES)中设定所使用的Connection。以下是一个示例:

CREATE EXTERNAL TABLE `default.user`(
  `id` bigint COMMENT 'from deserializer',
  `name` string COMMENT 'from deserializer', 
  `age` int COMMENT 'from deserializer', 
  `updatedtime` bigint COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'user'
TBLPROPERTIES (
  'classification'='json', 
  'connectionName'='MSK')

在这个建表语句中,需要注意的是最后一行:'connectionName'='MSK' ,这就是将connection配置给数据表的地方,这里的MSK是我创建的一个Connection的名字,稍后会给出这个Connection的具体信息。但这里要关注的重点在于:**Glue将对Kafka的接入“畸形”地映射成为了对一张Hive表的读取,这是很不好的。**这意味着,在Glue中, 像下面这种用标准Spark API读取Kafka消息的代码是跑不通的:

val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
      .option("subscribe", "user")
      .option("startingOffsets", "earliest")
      .load()

然后,看一下前面提到的名为MSK的Connection配置吧:

在这里插入图片描述

从这个Connection中可以看到,关键性的网络连接信息都是在Connection中配置的,Glue会基于这些信息在Glue作业节点上创建ENI(虚拟网卡),以便作业节点可以连接到Kafka的Broker。这就是为什么在Glue中我们不能脱离开Connection直接使用option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") 的形式连接Kafka的原因,因为Connection包含了网络相关的配置,而读取数据表生成Dataframe的过程会触发Connection的初始化,也就是打通网络,所以导致了Glue集成Kafka变成了一种类似"读表"的操作。

作为一份参考,以下是一份自动生成的读取MSK的参考脚本:

package com.github.glueboot.core.job

import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import java.util.Calendar
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.streaming.Trigger
import scala.collection.JavaConverters._

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    // @type: DataSource
    // @args: [stream_type = kafka, stream_batch_time = "100 seconds", database = "default", additionalOptions = {"startingOffsets": "earliest", "inferSchema": "false"}, stream_checkpoint_location = "s3://<my-bucket>/user/checkpoint/", table_name = "user"]
    // @return: datasource0
    // @inputs: []
    val data_frame_datasource0 = glueContext.getCatalogSource(database = "default", tableName = "user", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{"startingOffsets": "earliest", "inferSchema": "false"}""")).getDataFrame()
    glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => {
      if (dataFrame.count() > 0) {
        val datasource0 = DynamicFrame(dataFrame, glueContext)
        // @type: ApplyMapping
        // @args: [mapping = [("id", "long", "id", "long"), ("name", "string", "name", "string"), ("age", "int", "age", "int"), ("updatedtime", "long", "updatedtime", "long")], transformation_ctx = "applymapping0"]
        // @return: applymapping0
        // @inputs: [frame = datasource0]
        val applymapping0 = datasource0.applyMapping(mappings = Seq(("id", "long", "id", "long"), ("name", "string", "name", "string"), ("age", "int", "age", "int"), ("updatedtime", "long", "updatedtime", "long")), caseSensitive = false, transformationContext = "applymapping0")
        // @type: DataSink
        // @args: [stream_batch_time = "100 seconds", stream_checkpoint_location = "s3://<my-bucket>/user/checkpoint/", connection_type = "s3", path = "s3://<my-bucket>/user", format = "csv", transformation_ctx = "datasink1"]
        // @return: datasink1
        // @inputs: [frame = applymapping0]
        val year: Int = Calendar.getInstance().get(Calendar.YEAR)
        val month: Int = Calendar.getInstance().get(Calendar.MONTH) + 1
        val day: Int = Calendar.getInstance().get(Calendar.DATE)
        val hour: Int = Calendar.getInstance().get(Calendar.HOUR_OF_DAY)
        val minute: Int = Calendar.getInstance().get(Calendar.MINUTE)
        val path_datasink1 = "s3://<my-bucket>/user" + "/ingest_year=" + "%04d".format(year) + "/ingest_month=" + "%02d".format(month) + "/ingest_day=" + "%02d".format(day) + "/ingest_hour=" + "%02d".format(hour) + "/"
        val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(s"""{"path": "$path_datasink1"}"""), transformationContext = "datasink1", format = "csv").writeDynamicFrame(applymapping0)
      }
    }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://<my-bucket>/user/checkpoint/"}"""))
    Job.commit()
  }
}

推荐:博主历时三年倾注大量心血创作的《大数据平台架构与原型实现:数据中台建设实战》一书已由知名IT图书品牌电子工业出版社博文视点出版发行,真诚推荐给每一位读者!点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,扫码进入京东手机购书页面!
在这里插入图片描述

以上是关于aws创建kafka(msk)的主要内容,如果未能解决你的问题,请参考以下文章

AWS Glue集成Kafka/MSK

Amazon Kinesis 与 AWS Manage Service Kafka (MSK) -(从本地连接)

Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录

Kafka Connect 与 Amazon MSK

将 Lambda 连接到 AWS MSK 中的 kafka 主题的最经济有效的方法是啥?

AWS Kafka (MSK) - 如何生成 Keystore 和 truststore 并在我的 Spring Cloud Stream 应用程序中使用它们?