net.jpounz.lz4 使用火花流从 kafka 读取时出现异常

Posted

技术标签:

【中文标题】net.jpounz.lz4 使用火花流从 kafka 读取时出现异常【英文标题】:net.jpounz.lz4 exception when reading from kafka with spark streaming 【发布时间】:2019-03-25 14:13:23 【问题描述】:

我使用 python 使用 spark 2.4.0。并从 kafka_2.11-2.0.0 (二进制非源)读取数据。我正在使用 spark-submit --jars sspark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar script.py 错误报告中出现错误消息,如果有人可以提供帮助,谢谢:)

19/03/25 13:48:53 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
    at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
    at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
    at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
Exception in thread "stdout writer for python" java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
    at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
    at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
    at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:453)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

jar文件的pom.xml:spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-parent_2.11</artifactId>
    <version>2.4.0</version>
    <relativePath>../../pom.xml</relativePath>
  </parent>

  <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
  <packaging>jar</packaging>
  <name>Spark Project External Kafka Assembly</name>
  <url>http://spark.apache.org/</url>

  <properties>
    <sbt.project.name>streaming-kafka-0-8-assembly</sbt.project.name>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_$scala.binary.version</artifactId>
      <version>$project.version</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_$scala.binary.version</artifactId>
      <version>$project.version</version>
      <scope>provided</scope>
    </dependency>
    <!--
      Demote already included in the Spark assembly.
    -->
    <dependency>
      <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.lz4</groupId>
      <artifactId>lz4-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-mapred</artifactId>
      <classifier>$avro.mapred.classifier</classifier>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.xerial.snappy</groupId>
      <artifactId>snappy-java</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
  </dependencies>

  <build>
  <outputDirectory>target/scala-$scala.binary.version/classes</outputDirectory>
  <testOutputDirectory>target/scala-$scala.binary.version/test-classes</testOutputDirectory>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <configuration>
        <shadedArtifactAttached>false</shadedArtifactAttached>
        <artifactSet>
          <includes>
            <include>*:*</include>
          </includes>
        </artifactSet>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
          </filter>
        </filters>
      </configuration>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <transformers>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                <resource>reference.conf</resource>
              </transformer>
              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                <resource>log4j.properties</resource>
              </transformer>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
            </transformers>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
</project>

【问题讨论】:

【参考方案1】:

我已经解决了这个问题,我正在使用以下命令执行我的代码:

bin/spark-submit --jars external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar Path/kafka_test.py localhost:2181 test

当我用这个命令运行它时它起作用了:

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0 Path/kafka_test.py localhost:2181 test

【讨论】:

以上是关于net.jpounz.lz4 使用火花流从 kafka 读取时出现异常的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法修改此代码以让火花流从 json 中读取?

使用 json 文件触发流式传输

如何使用流从对象列表中获取地图? [复制]

如何使用 bloc 未来获取流从后端过滤数据?

如何使用命令查看kaf

如何使用流从两个列表或数组乘法中查找元素对