如何创建一个 Spark 数据框以从 np.arrays 列表(由 RDKit 生成)提供给 sparks 随机森林实现?

Posted

技术标签:

【中文标题】如何创建一个 Spark 数据框以从 np.arrays 列表(由 RDKit 生成)提供给 sparks 随机森林实现?【英文标题】:How to create a Spark dataframe to feed to sparks random forest implementation from a list of np.arrays (generated by RDKit)? 【发布时间】:2021-01-15 10:39:30 【问题描述】:

我正在尝试使用 RDKit 生成分子描述符,然后使用 Spark 对它们进行机器学习。我设法生成了描述符,并且找到了the following code for doing Random Forest。该代码从以 svmlight 格式存储的文件中加载数据帧,我可以使用 dump_svmlight_file 创建这样的文件,但写入文件感觉不是很“Sparky”。

我已经走到这一步了:

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option("header","true")\
               .option("delimiter", '\t').csv("acd_logd_100.smiles")
mols = df.select("canonical_smiles").rdd.flatMap(lambda x : x)\
         .map(lambda x: Chem.MolFromSmiles(x))\
         .map(lambda x: AllChem.GetMorganFingerprintAsBitVect(x, 2, nBits=1024))\
         .map(lambda x: np.array(x))
spark.createDataFrame(mols)

但显然我不能像这样从我的 np.arrays 的 RDD 创建一个 DataFrame。 (我收到一条关于ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all() 的奇怪错误消息)。

我想我还需要添加 y 值并以某种方式告诉随机森林实现数据框中的 x 是什么,y 是什么,但我还不能从这些数据中创建一个数据框。如何做到这一点?


编辑: 我试图通过pyspark.ml.linalg.Vectors 创建一个基于Creating Spark dataframe from numpy matrix 松散的数据框,但我似乎无法创建一个像这样的向量:

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option("header","true")\
               .option("delimiter", '\t').csv("acd_logd_100.smiles")
mols = df.select("canonical_smiles").rdd.flatMap(lambda x : x)\
         .map(lambda x: Chem.MolFromSmiles(x))\
         .map(lambda x: AllChem.GetMorganFingerprintAsBitVect(x, 2, nBits=1024))\
         .map(lambda x: np.array(x))\
         .map(lambda x: Vectors.sparse(x))
print(mols.take(5))         

mydf = spark.createDataFrame(mols,schema=["features"])

我明白了:

TypeError: only size-1 arrays can be converted to Python scalars

我完全不明白。

【问题讨论】:

我对 spark 了解不多,但如果我用 pandas 尝试这个,我会尝试在创建数据框之前添加行 mols = np.vstack(mols),否则 pandas 会将 numpy 数组存储在一列而不是而不是跨列扩展它们。也许这也对你有用。 啊,我猜你还需要将一个 numpy 数组转换为可以用 spark 读取的格式,如果这没有帮助,抱歉。也许this 的回答也会有所帮助。 【参考方案1】:

所以如果你在这里找到了自己的方式,我想我会分享我最终得到的结果。最后我选择了密集向量,因为它更容易。我想出的从 RDKit 向量出发的唯一方法是首先创建一个 numpy.array,然后从中创建一个 Spark Vectors.dense。我也意识到我需要在整个转换过程中拖拽 y 值,显然,一旦 x 值被整理出来,你就不能将该列添加到最后的 ataframe,因此复杂的 touple。

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
from sklearn.datasets import dump_svmlight_file

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option("header","true")\
               .option("delimiter", '\t').csv("acd_logd_100.smiles")

print(df.select("canonical_smiles", "acd_logd").rdd)

data = df.select("canonical_smiles", "acd_logd").rdd.map( lambda row: (row.canonical_smiles, float(row.acd_logd)) )\
         .map( lambda x: (Chem.MolFromSmiles(x[0]), x[1]) )\
         .map( lambda x: (AllChem.GetMorganFingerprintAsBitVect(x[0], 2, nBits=1024), x[1]) )\
         .map( lambda x: (np.array(x[0]),x[1]) )\
         .map( lambda x: (Vectors.dense(x[0].tolist()),x[1]) )\
         .map( lambda x: (x[0],x[1]))\
         .toDF(["features", "label"] )

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only

spark.stop()

【讨论】:

以上是关于如何创建一个 Spark 数据框以从 np.arrays 列表(由 RDKit 生成)提供给 sparks 随机森林实现?的主要内容,如果未能解决你的问题,请参考以下文章

循环遍历列表以从 SQL 查询创建多个数据帧

Cassandra&Spark:我可以将项目添加到行以从行列表创建数据框

Python 3 函数循环遍历 pandas 数据框以更改模式

在循环中创建多个循环的数据框以进行半正弦地理定位

映射两个数据框以创建一个包含多个键值的字典 - Pandas

Spark sql连接两个没有主键的数据帧