如何根据条件更改考拉数据框中的值

Posted

技术标签:

【中文标题】如何根据条件更改考拉数据框中的值【英文标题】:How change the value in a koalas dataframe based in a condition 【发布时间】:2020-11-27 13:35:16 【问题描述】:

我正在使用考拉,我想根据条件更改列的值。

在熊猫中我可以这样做:

import pandas as pd
df_test = pd.DataFrame(
    'a': [1,2,3]
    ,'b': ['one','two','three'])

df_test2 =   pd.DataFrame(
    'c': [2,1,3]
    ,'d': ['one','two','three'])


df_test.loc[df_test.a.isin(df_test2['c']),'b'] = 'four'

df_test.head()

    a   b
0   1   four
1   2   four
2   3   four


我正在尝试在考拉中使用相同的方法,但出现此错误:

---------------------------------------------------------------------------
PandasNotImplementedError                 Traceback (most recent call last)
<ipython-input-15-814219258adb> in <module>
      5 new_loans['write_offs'] = 0
      6 
----> 7 new_loans.loc[(new_loans['ID'].isin(userinput_write_offs['id'])),'write_offs'] = 1
      8 new_loans.loc[new_loans['write_offs']==1,'is_active'] = 0
      9 new_loans = new_loans.sort_values(by = ['ZOHOID','Disb Date'])

/usr/local/lib/python3.7/dist-packages/databricks/koalas/base.py in isin(self, values)
    894             )
    895 
--> 896         return self._with_new_scol(self.spark.column.isin(list(values)))
    897 
    898     def isnull(self) -> Union["Series", "Index"]:

/usr/local/lib/python3.7/dist-packages/databricks/koalas/series.py in __iter__(self)
   5871 
   5872     def __iter__(self):
-> 5873         return MissingPandasLikeSeries.__iter__(self)
   5874 
   5875     if sys.version_info >= (3, 7):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/missing/__init__.py in unsupported_function(*args, **kwargs)
     21     def unsupported_function(*args, **kwargs):
     22         raise PandasNotImplementedError(
---> 23             class_name=class_name, method_name=method_name, reason=reason
     24         )
     25 

PandasNotImplementedError: The method `pd.Series.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

如何在考拉做同样的操作?

更新

以下这个问题:Assign Koalas Column from Numpy Result我已经完成了:

df_test.loc[df_test.a.isin(df_test2['c'].to_list()),'b'] = 'four'

但现在我有这个错误:

---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
/usr/local/lib/python3.7/dist-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

/usr/local/lib/python3.7/dist-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in __repr__(self)
  10614             return self._to_internal_pandas().to_string()
  10615 
> 10616         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  10617         pdf_length = len(pdf)
  10618         pdf = pdf.iloc[:max_display_count]

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  10606     def _get_or_create_repr_pandas_cache(self, n):
  10607         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
> 10608             self._repr_pandas_cache = n: self.head(n + 1)._to_internal_pandas()
  10609         return self._repr_pandas_cache[n]
  10610 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/frame.py in _to_internal_pandas(self)
  10602         This method is for internal use only.
  10603         """
> 10604         return self._internal.to_pandas_frame
  10605 
  10606     def _get_or_create_repr_pandas_cache(self, n):

/usr/local/lib/python3.7/dist-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    514     def wrapped_lazy_property(self):
    515         if not hasattr(self, attr_name):
--> 516             setattr(self, attr_name, fn(self))
    517         return getattr(self, attr_name)
    518 

/usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
    807         """ Return as pandas DataFrame. """
    808         sdf = self.to_internal_spark_frame
--> 809         pdf = sdf.toPandas()
    810         if len(pdf) == 0 and len(sdf.schema) > 0:
    811             pdf = pdf.astype(

/usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    136 
    137         # Below is toPandas without Arrow optimization.
--> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    139         column_counter = Counter(self.columns)
    140 

/usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
    594         """
    595         with SCCallSiteSync(self._sc) as css:
--> 596             sock_info = self._jdf.collectToPython()
    597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    598 

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/usr/local/spark/python/pyspark/sql/utils.py in raise_from(e)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 1110, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'pandas'

为什么要尝试使用 pandas?

【问题讨论】:

不确定是否有帮助,但尝试添加loc:new_loans.loc[(new_loans.loc['ID'].isin(userinput_write_offs.loc['id'])),'write_offs'] = 1 【参考方案1】:

Koalas 包向用户公开了高级别的 Pandas Like API,但在后台实现是使用 PySpark API 完成的

我观察到,在您粘贴的堆栈跟踪日志中,正在使用toPandas() 方法从sdf spark Dataframe 创建一个pandas dataframe,并将其分配给pdf

toPandas()函数的实现中,正在导入pandasnumpy

检查line numbers 809138

/usr/local/lib/python3.7/dist-packages/databricks/koalas/internal.py in to_pandas_frame(self)
    807         """ Return as pandas DataFrame. """
    808         sdf = self.to_internal_spark_frame
--> 809         pdf = sdf.toPandas()
    810         if len(pdf) == 0 and len(sdf.schema) > 0:
    811             pdf = pdf.astype(

/usr/local/spark/python/pyspark/sql/pandas/conversion.py in toPandas(self)
    136 
    137         # Below is toPandas without Arrow optimization.
--> 138         pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
    139         column_counter = Counter(self.columns)
    140 

/usr/local/spark/python/pyspark/sql/dataframe.py in collect(self)
    594         """
    595         with SCCallSiteSync(self._sc) as css:
--> 596             sock_info = self._jdf.collectToPython()
    597         return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
    598 

您可以在以下链接查看toPandas() 函数的实现: https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/conversion.py

【讨论】:

以上是关于如何根据条件更改考拉数据框中的值的主要内容,如果未能解决你的问题,请参考以下文章

如何将逗号添加到数据框中的现有值中?熊猫

根据熊猫数据框中的条件获取最大值和最小值

如何根据条件将数据框中的一列切成多个系列

如何根据条件选择R数据框中的连续行?

使用“或”在熊猫数据框中选择值时如何编写条件[重复]

如何在 python 的另一列中的字符串值中从数据框中的一列中搜索字符串?