如何将 Cassandra Map 转换为 Pandas Dataframe

Posted

技术标签:

【中文标题】如何将 Cassandra Map 转换为 Pandas Dataframe【英文标题】:how to convert Cassandra Map to Pandas Dataframe 【发布时间】:2017-07-14 04:48:54 【问题描述】:

我想从 map<string, int> 类型的 cassandra 列族中读取数据,并将其转换为 Pandas 数据框。我还想用它来训练python中的模型,正如在虹膜物种分类中提到的here。

如果,我会使用 csv 来训练模型。然后它看起来像这样:

label,  f1, f2, f3, f4, f5
  0  ,  11 , 1, 6 , 1,  2  
  1  ,  5,   5, 1 , 2,  6
  0  ,  12,  9, 3 , 6,  8
  0  ,  9,  3,  8,  1,  0 

Cassandra 列族:

                  FeatureSet                    |   label

'f1': 11, 'f2': 1, 'f3': 6, 'f4': 1, 'f5': 2  |     0
'f1': 5, 'f2':  5, 'f3': 1, 'f4': 2, 'f5': 6  |     1
'f1': 12, 'f2': 9, 'f3': 3, 'f4': 6, 'f5': 8  |     0
'f1': 9, 'f2': 3, 'f3': 8, 'f4': 1, 'f5': 0   |     0

代码:

import pandas as pd
from sklearn2pmml import PMMLPipeline
from sklearn.tree import DecisionTreeClassifier
from cassandra.cluster import Cluster

CASSANDRA_HOST = ['172.16.X.Y','172.16.X1.Y1'] 
CASSANDRA_PORT = 9042
CASSANDRA_DB = "KEYSPACE"
CASSANDRA_TABLE = "COLUMNFAMILY"

cluster = Cluster(contact_points=CASSANDRA_HOST, port=CASSANDRA_PORT)
session = cluster.connect(CASSANDRA_DB)

sql_query = "SELECT * FROM .;".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):  
            What should i write here and get X_train, Y_train in pandas dataframe 



iris_pipeline = PMMLPipeline([
    ("classifier", DecisionTreeClassifier())
])
iris_pipeline.fit(X_train, Y_train)

【问题讨论】:

【参考方案1】:

除了 MaxU 答案之外,如果您想将结果视为数据框,您需要做的就是再添加一行:

df = pd.DataFrame(rslt._current_rows)

【讨论】:

【参考方案2】:

我针对同一问题发布了一个有效的解决方案here,以将OrderedMapSerializedKey Cassandra 映射字段作为字典读取到您的数据框中。


编辑:

在之前的解决方案中,我只替换了 Cassandra 数据集的第一(0)行(rows 是元组列表,其中每个元组都是 Cassandra 中的一行)

from cassandra.util import OrderedMapSerializedKey

def pandas_factory(colnames, rows):

    # Convert tuple items of 'rows' into list (elements of tuples cannot be replaced)
    rows = [list(i) for i in rows]

    # Convert only 'OrderedMapSerializedKey' type list elements into dict
    for idx_row, i_row in enumerate(rows):

        for idx_value, i_value in enumerate(i_row):

            if type(i_value) is OrderedMapSerializedKey:

                rows[idx_row][idx_value] = dict(rows[idx_row][idx_value])

    return pd.DataFrame(rows, columns=colnames)

【讨论】:

【参考方案3】:

你可以使用this approach:

import pandas as pd
from cassandra.cluster import Cluster

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

CASSANDRA_HOST = ['172.16.X.Y','172.16.X1.Y1'] 
CASSANDRA_PORT = 9042
CASSANDRA_DB = "KEYSPACE"
CASSANDRA_TABLE = "COLUMNFAMILY"

cluster = Cluster(contact_points=CASSANDRA_HOST, port=CASSANDRA_PORT)
session = cluster.connect(CASSANDRA_DB)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT * FROM .;".format(CASSANDRA_DB, CASSANDRA_TABLE)

rslt = session.execute(query, timeout=None)
df = rslt._current_rows

【讨论】:

这个方法我已经试过了。但是print df 在输出中只有 map 的键,即 (f1, f2, f3, f4)。当我打印df.values 时,它会给出[OrderedMapSerializedKey([(u'f1', 11), (u'f2', 1), (u'f3', 6), (u'f4', 1), (u'f5', 2)])]。我无法使用这些值来训练它。它应该返回我的数值以及第一行中的标题。就像我在 csv 中提到的那样

以上是关于如何将 Cassandra Map 转换为 Pandas Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

将关系模式转换为 Cassandra 的建议

将 LocalDateTime 转换为 Cassandra TIMESTAMP 时的编解码器问题

Cassandra 聚合到 Map

Cassandra - 如何使用 map<text, uuid> 在 UDT 上执行 Solr 查询

如何将for循环转换为map函数

如何将 XML 转换为 java.util.Map,反之亦然?