从文件/rdd 读取数据时出现 Unicode 错误

Posted

技术标签:

【中文标题】从文件/rdd 读取数据时出现 Unicode 错误【英文标题】:Unicode error while reading data from file/rdd 【发布时间】:2019-12-20 13:15:41 【问题描述】:

从文本文件中获取数据后,我正在尝试创建具有正确架构的数据框。在 RDD 中,所有数据类型都是字符串,但是其中一种字段数据类型是整数,我想确保将其创建为整数。所以我创建了 Structtype 并创建了数据框。但它会引发如下错误。

错误信息:

----------------------------------- ---------------------------- Py4JJavaError Traceback(最近调用 最后)在() ----> 1 df.show()

/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc 在显示(自我,n,截断,垂直) 第376章 377 如果 isinstance(truncate, bool) 并截断: --> 378 打印(self._jdf.showString(n,20,垂直)) 379 其他: 380 打印(self._jdf.showString(n,int(截断),垂直))

/Applications/anaconda2/lib/python2.7/site-packages/py4j/java_gateway.pyc 在 call(self, *args) 1284 回答 = self.gateway_client.send_command(command) 1285 return_value = get_return_value( -> 1286 answer, self.gateway_client, self.target_id, self.name) 1287 1288 for temp_args in temp_args:

/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/utils.pyc 装饰中(*a,**kw) 61 def deco(*a, **kw): 62 尝试: ---> 63 返回 f(*a, **kw) 64 除了 py4j.protocol.Py4JJavaError 作为 e: 65 秒 = e.java_exception.toString()

/Applications/anaconda2/lib/python2.7/site-packages/py4j/protocol.pyc 在 get_return_value(answer, gateway_client, target_id, name) 第326章 327 “调用 012 时出错。\n”。 --> 328 格式(target_id, ".", name), value) 329 其他: 330引发Py4JError(

Py4JJavaError:调用 o64.showString 时出错。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 3.0 中的任务 0 失败 1 次,最近一次失败:丢失任务 0.0 在 3.0 阶段(TID 5、本地主机、执行程序驱动程序): org.apache.spark.api.python.PythonException:回溯(最近 最后调用):文件 "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", 第 377 行,主要 进程()文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”, 第 372 行,处理中 serializer.dump_stream(func(split_index, iterator), outfile) 文件 "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", 第 393 行,在 dump_stream 中 vs = list(itertools.islice(迭代器,批处理))文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py”, 第 99 行,在包装器中 返回 f(*args, **kwargs) 文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/session.py”, 第 730 行,准备中 verify_func(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1389 行,验证中 验证值(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1370 行,在 verify_struct 中 验证程序(v)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1389 行,验证中 验证值(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1315 行,在 verify_integer 中 verify_acceptable_types(obj) 文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1278 行,在 verify_acceptable_types 中 % (dataType, obj, type(obj)))) TypeError: field id: IntegerType can't accept object u'1' in type

在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) 在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) 在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(未知 来源)在 org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) 在 org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365) 在 org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 在 org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364) 在 org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) 在 org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) 在 org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363) 在 org.apache.spark.sql.Dataset.head(Dataset.scala:2544) 在 org.apache.spark.sql.Dataset.take(Dataset.scala:27​​58) 在 org.apache.spark.sql.Dataset.getRows(Dataset.scala:254) 在 org.apache.spark.sql.Dataset.showString(Dataset.scala:291) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 原因: org.apache.spark.api.python.PythonException:回溯(最近 最后调用):文件 "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", 第 377 行,主要 进程()文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py”, 第 372 行,处理中 serializer.dump_stream(func(split_index, iterator), outfile) 文件 "/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", 第 393 行,在 dump_stream 中 vs = list(itertools.islice(迭代器,批处理))文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py”, 第 99 行,在包装器中 返回 f(*args, **kwargs) 文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/session.py”, 第 730 行,准备中 verify_func(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1389 行,验证中 验证值(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1370 行,在 verify_struct 中 验证程序(v)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1389 行,验证中 验证值(obj)文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1315 行,在 verify_integer 中 verify_acceptable_types(obj) 文件“/Users/nagaraju.n/spark-2.4.3-bin-hadoop2.7/python/pyspark/sql/types.py”, 第 1278 行,在 verify_acceptable_types 中 % (dataType, obj, type(obj)))) TypeError: field id: IntegerType can't accept object u'1' in type

在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452) 在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588) 在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(未知 来源)在 org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 在 org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 更多

#!/usr/bin/env python

编码:utf-8

在[11]:

import os import sys from pyspark import SparkContext from pyspark.sql 从 pyspark.sql.types 导入 SparkSession * spark=SparkSession.builder.getOrCreate() sc = SparkContext.getOrCreate()

在[12]:

从文件中读取数据并创建 rdd rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

在[13]:

类型(rdd)

在[14]:

rdd_data=rdd.map(lambda p: p.split(","))

在[15]:

rdd_data.collect()

在[16]:

打印(rdd_data)

在[17]:

orig_header=rdd_data.first()

在[18]:

类型(orig_header)

在[19]:

rdd_withoutheader=rdd_data.filter(lambda p:p != orig_header)

在[20]:

rdd_withoutheader.collect()

在[21]:

创建架构头 = StructType([StructField("id", IntegerType(), True),StructField("first_name", StringType(),

True),StructField("last_name", StringType(), True),StructField("email", StringType(), True),StructField("电话", StringType(), True),StructField("city", StringType(), True),StructField("国家", StringType(), True)])

在[22]:

标题

在[23]:

df=spark.createDataFrame(rdd_withoutheader,header)

在[24]:

df.show()

【问题讨论】:

可能重复:***.com/questions/33129918/… 【参考方案1】:

/// 部分代码:

header = StructType([StructField("stockticker", StringType(), True),StructField("tradedate", IntegerType(), True),StructField("openprice", FloatType(), True),StructField(" highprice", FloatType(), True),StructField("lowprice", FloatType(), True),StructField("closeprice", FloatType(), True),StructField("volume", IntegerType(), True)])

df=spark.createDataFrame(rdd_data,header)

///

我的回答:

Schema 最常用于避免全表扫描来推断类型,并且不执行任何类型转换。因此,上述方法最适用于 Json/avro/parquet 输入文件,而不适用于文本文件。对于文本文件,以下是最好的方法:

方法一,根据你的代码,将rdd转成dataframe,定义schema如下:

rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

df_noType=data.map(lambda p: p.split(",")).toDF(["id", "first_name", "last_name", "email", "phone", "city", "国家”])

现在您可以通过以下任何一种方式进行类型转换:

方式1:

df_typecast=df_noType.select(df_noType.id.cast('int'), df_noType.first_name, df_noType.last_name, df_noType.email, df_noType.phone, df_noType.city, df_noType.country)

注意:在上面的行中,不需要将其他字段类型转换为字符串,因为它们是默认字符串

注意:如果有小数,则可以使用 df_noType.id.cast('float')

(或)

方式2:

从 pyspark.sql.types 导入 *

df_typecast=df_noType.select(df_noType.id.cast(IntegerType()), df_noType.first_name.cast(StringType()), df_noType.last_name.cast(StringType()), df_noType.email.cast(StringType( )), df_noType.phone.cast(StringType()), df_noType.city.cast(StringType()), df_noType.country.cast(StringType()))

方法2:我通常总是使用这个我觉得最好和容易的方法

rdd=sc.textFile('/Users/nagaraju.n/Downloads/sample_data.txt')

从 pyspark.sql 导入行

df=rdd.map(lambda p: Row(id= int(p.split(",")[0]), first_name= p.split(",")[1], last_name= p.split (",")[2], 电子邮件= p.split(",")[3], 电话= p.split(",")[4], 城市= p.split(",")[5] , country=p.split(",")[6])).toDF()

df.printSchema()

注意:如果有小数,则可以使用 float(p.split(",")[0])

【讨论】:

感谢 Sri,当我尝试 Method2 时出现以下错误: ValueError: invalid literal for int() with base 10: 'id' 您好,在应用上述功能之前,您必须从作为标题的数据中删除第一行。希望你得到。您的数据第一行是“id、first_name 等”。因为 id 是字符串,所以它的给定值错误。

以上是关于从文件/rdd 读取数据时出现 Unicode 错误的主要内容,如果未能解决你的问题,请参考以下文章

将 pandas 数据框转换为 PySpark RDD 时出现问题?

从 RDD 中的元组中解包项目时出现 Spark 错误

C#中StreamReader读取中文时出现乱码问题总结

从 .xls 文件读取数据时出现 Python 错误

C#读取Excle文件时报错,怎么处理?

使用 pandas Python (pandas.io.parsers.TextFileReader) 从文件中读取数据时出现问题