Pyspark 朴素贝叶斯批量使用拟合

Posted

技术标签:

【中文标题】Pyspark 朴素贝叶斯批量使用拟合【英文标题】:Pyspark Naive Bayes using fit in batches 【发布时间】:2018-01-08 10:58:17 【问题描述】:

我一直在尝试在大型数据库 (30GB) 上训练朴素贝叶斯分类器。 由于内存限制,我必须将数据库查询拆分为多个批次。

我正在使用如下所示的管道:

categoryIndexer = StringIndexer(inputCol="diff", outputCol="label")
tokenizer = Tokenizer(inputCol="text", outputCol="raw")
remover = StopWordsRemover(inputCol="raw", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features",  numFeatures=100000)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[categoryIndexer, tokenizer, remover, hashingTF, nb])

然后在 for 循环中使用 fit。

for i in range(0,365):
    df = sqlContext.read.jdbc(url=url,table="(SELECT text, diff FROM tweets INNER JOIN djitf ON tweets.created = djitf.day WHERE id > "+ str(i*1000000)+ "AND id <"+ str((i+1)*1000000)+") as table1", properties=properties)
    train_data, test_data = df.randomSplit([0.8, 0.2])
    model = pipeline.fit(train_data)

但是,我的结果表明,每次我在管道上调用 fit 函数时,模型都会被覆盖。如何保留已经拟合的数据,然后添加?

是否有参数或我缺少的东西?例如在 Sklearn 中有 partial_fit 方法

【问题讨论】:

【参考方案1】:

没有缺少参数。 Spark 不支持增量拟合,因此不应该是必需的。 Spark 可以轻松处理大于内存的数据,可能使用磁盘缓存。如果 30GB 的数据对于您的资源来说仍然太多,那么您根本不应该使用 Spark。

如果问题只是在读取使用谓词上:

predicates = [
    "id > 0 AND id < 1".format(i * 1000000, (i + 1) * 1000000)
    for i in range(0, 365)
]

df = sqlContext.read.jdbc(
    url=url,
    table="""(SELECT text, diff 
               FROM tweets INNER 
               JOIN djitf ON tweets.created = djitf.day") as table1""", 
     predicates=predicates,
     properties=properties)

或 JDBC 阅读器的范围:

df = sqlContext.read.jdbc(
    url=url,
    table="""(SELECT cast(id, INTEGER), text, diff 
               FROM tweets INNER 
               JOIN djitf ON tweets.created = djitf.day") as table1""",
    column="id", lowerBound=0, upperBound=366 * 1000000, numPartitions=300)

【讨论】:

以上是关于Pyspark 朴素贝叶斯批量使用拟合的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 中朴素贝叶斯分类器的阈值是啥?

获取特征重要性 PySpark 朴素贝叶斯分类器

朴素贝叶斯分类——大道至简

干货 | 朴素贝叶斯python代码实现

使用高斯朴素贝叶斯的多类分类

如何生成混淆矩阵并找到朴素贝叶斯分类器的错误分类率?