SparkStreaming整合Flume的pull方式之启动报错解决方案

Posted skywp

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkStreaming整合Flume的pull方式之启动报错解决方案相关的知识,希望对你有一定的参考价值。

Flume配置文件:

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

#Describe/configure the source
simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = centos
simple-agent.sources.netcat-source.port= 44444

# Describe the sink
simple-agent.sinks.spark-sink.type=org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname= centos 
simple-agent.sinks.spark-sink.port= 41414

simple-agent.channels.memory-channel.type = memory
simple-agent.channels.memory-channel.capacity = 1000
simple-agent.channels.memory-channel.transactionCapacity = 100

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

但是在启动Flume时,报以下错误:

2019-10-16 11:35:14,559 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink
    at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:71)
    at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:410)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.sink.SparkSink
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
    ... 11 more

解决方案:

由于用到了agent的sink是 org.apache.spark.streaming.flume.sink.SparkSink类型,需要把spark-streaming-flume-sink_2.11-2.4.3.jar复制到flume的lib目录,否则,会报找不到org.apache.spark.streaming.flume.sink.SparkSink类的错误。

欢迎关注我的公号:彪悍大蓝猫,持续分享大数据、Java、安全干货~

以上是关于SparkStreaming整合Flume的pull方式之启动报错解决方案的主要内容,如果未能解决你的问题,请参考以下文章

SparkStreaming

flume整合数据到kafka,sparkStreaming消费数据,并存储到hbase和redis中

flume整合数据到kafka,sparkStreaming消费数据,并存储到hbase和redis中

SparkStreaming整合kafka

SparkStreaming整合Flume的pull方式之启动报错解决方案

大数据Spark“蘑菇云”行动之flume整合spark streaming