如何将数据从 Kafka 传递到 Spark Streaming?
Posted
技术标签:
【中文标题】如何将数据从 Kafka 传递到 Spark Streaming?【英文标题】:How to pass data from Kafka to Spark Streaming? 【发布时间】:2016-04-23 14:40:50 【问题描述】:我正在尝试将数据从 kafka 传递到 spark 流。
这是我到目前为止所做的:
-
同时安装了
kafka
和spark
使用默认属性配置启动 zookeeper
使用默认属性配置启动 kafka server
开始kafka producer
开始kafka consumer
从生产者向消费者发送消息。工作正常。
写了 kafka-spark.py 来接收来自 kafka 的消息到 spark。
我尝试运行./bin/spark-submit examples/src/main/python/kafka-spark.py
我收到一个错误。
kafka-spark.py -
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
#conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
#sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1='spark-kafka':1
kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too
print("kafkastream=",kafkaStream)
sc.stop()
完整日志,包括运行 spark-kafka.py 的错误:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/18 13:05:33 INFO SparkContext: Running Spark version 1.6.0
16/01/18 13:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/18 13:05:33 INFO SecurityManager: Changing view acls to: username
16/01/18 13:05:33 INFO SecurityManager: Changing modify acls to: username
16/01/18 13:05:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); users with modify permissions: Set(username)
16/01/18 13:05:33 INFO Utils: Successfully started service 'sparkDriver' on port 54446.
16/01/18 13:05:34 INFO Slf4jLogger: Slf4jLogger started
16/01/18 13:05:34 INFO Remoting: Starting remoting
16/01/18 13:05:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@127.0.0.1:50386]
16/01/18 13:05:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50386.
16/01/18 13:05:34 INFO SparkEnv: Registering MapOutputTracker
16/01/18 13:05:34 INFO SparkEnv: Registering BlockManagerMaster
16/01/18 13:05:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f5490271-cdb7-467d-a915-4f5ccab57f0e
16/01/18 13:05:34 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/01/18 13:05:34 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/18 13:05:34 INFO Server: jetty-8.y.z-SNAPSHOT
16/01/18 13:05:34 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/01/18 13:05:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/18 13:05:34 INFO SparkUI: Started SparkUI at http://127.0.0.1:4040
Java HotSpot(TM) Server VM warning: You have loaded library /tmp/libnetty-transport-native-epoll561240765619860252.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
16/01/18 13:05:34 INFO Utils: Copying ~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py to /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/userFiles-e93fc252-0ba1-42b7-b4fa-2e46f3a0601e/kafka-spark.py
16/01/18 13:05:34 INFO SparkContext: Added file file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py at file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py with timestamp 1453118734892
16/01/18 13:05:35 INFO Executor: Starting executor ID driver on host localhost
16/01/18 13:05:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58970.
16/01/18 13:05:35 INFO NettyBlockTransferService: Server created on 58970
16/01/18 13:05:35 INFO BlockManagerMaster: Trying to register BlockManager
16/01/18 13:05:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58970 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58970)
16/01/18 13:05:35 INFO BlockManagerMaster: Registered BlockManager
________________________________________________________________________________________________
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6.0 ...
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.6.0.
Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
________________________________________________________________________________________________
Traceback (most recent call last):
File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py", line 33, in <module>
kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1)
File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 80, in createStream
py4j.protocol.Py4JJavaError: An error occurred while calling o22.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
16/01/18 13:05:35 INFO SparkContext: Invoking stop() from shutdown hook
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/metrics/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/stage/kill,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/api,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/static,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/executors/threadDump/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/executors/threadDump,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/executors/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/executors,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/environment/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/environment,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/storage/rdd/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/storage/rdd,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/storage/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/storage,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/pool/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/pool,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/stage/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/stage,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/stages,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/jobs/job/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/jobs/job,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/jobs/json,null
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler/jobs,null
16/01/18 13:05:35 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
16/01/18 13:05:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/18 13:05:35 INFO MemoryStore: MemoryStore cleared
16/01/18 13:05:35 INFO BlockManager: BlockManager stopped
16/01/18 13:05:35 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/18 13:05:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/01/18 13:05:35 INFO SparkContext: Successfully stopped SparkContext
16/01/18 13:05:35 INFO ShutdownHookManager: Shutdown hook called
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/pyspark-fcd47a97-57ef-46c3-bb16-357632580334
编辑
在运行./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar examples/src/main/python/kafka-spark.py
时,我得到的是十六进制位置,而不是实际的字符串:
kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fd6c4dad150>
知道我做错了什么吗?我对 kakfa 和 spark 很陌生,所以我需要一些帮助。谢谢!
【问题讨论】:
【参考方案1】:您需要在工作中提交 spark-streaming-kafka-assembly_*.jar:
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar ./spark-kafka.py
【讨论】:
嗨!感谢您的回答。我已经尝试过了,但我得到了一个不同的错误。检查我的新编辑。我加了。 尝试将 map1='spark-kafka':'1' 更改为 map1='spark-kafka':1 谢谢,现在我收到的是kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fc55106d150>
,而不是实际的消息。我想要的是打印实际的消息。 print kafkastream
似乎打印了别的东西。
看看这个例子:github.com/apache/spark/blob/master/examples/src/main/python/…【参考方案2】:
或者,如果您还想同时指定要分配的资源:
spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar ./spark-kafka.py
如果您想在 Jupyter-notebook 中运行您的代码,那么这可能会有所帮助:
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-assembly_2.10-1.6.0.jar pyspark-shell' #note that the "pyspark-shell" part is very important!!.
#conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
#sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1='spark-kafka':1
kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too
print("kafkastream=",kafkaStream)
sc.stop()
注意__main__
中引入以下行:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-assembly_2.10-1.6.0.jar pyspark-shell'
来源:https://github.com/jupyter/docker-stacks/issues/154
【讨论】:
将 jar 下载到$SPARK_HOME/jars
并添加 os.environ['PYSPARK_SUBMIT_ARGS']
如此处所述解决了我在 Jupyter Notebook 中运行它的问题。谢谢!【参考方案3】:
为了打印 DStream,spark 为 Python 提供了 pprint 方法。所以你会使用
kafkastream.pprint()
【讨论】:
以上是关于如何将数据从 Kafka 传递到 Spark Streaming?的主要内容,如果未能解决你的问题,请参考以下文章
java spark-streaming接收TCP/Kafka数据
如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取