如何使用 Apache Spark Dataframes (Python) 执行 Switch 语句

Posted

技术标签:

【中文标题】如何使用 Apache Spark Dataframes (Python) 执行 Switch 语句【英文标题】:How to perform a Switch statement with Apache Spark Dataframes (Python) 【发布时间】:2016-04-19 21:15:17 【问题描述】:

我正在尝试对我的数据执行操作,如果某个值与某个条件匹配,则该值将被映射到预先确定的值列表,否则将映射到一个贯穿值。

这将是等效的 SQL:

CASE
            WHEN user_agent LIKE \'%CanvasAPI%\' THEN \'api\'
            WHEN user_agent LIKE \'%candroid%\' THEN \'mobile_app_android\'
            WHEN user_agent LIKE \'%iCanvas%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%CanvasKit%\' THEN \'mobile_app_ios\'
            WHEN user_agent LIKE \'%Windows NT%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%MacBook%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%iPhone%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPod Touch%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iPad%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%iOS%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%CrOS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Android%\' THEN \'mobile\'
            WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Mac OS%\' THEN \'desktop\'
            WHEN user_agent LIKE \'%Macintosh%\' THEN \'desktop\'
            ELSE \'other_unknown\'
            END AS user_agent_type

我对 Spark 还很陌生,所以我第一次尝试这个程序时使用了一个查找字典并逐行调整 RDD 中的值,如下所示:

USER_AGENT_VALS = 
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'


def parse_requests(line: list,
                   id_data: dict,
                   user_vals: dict = USER_AGENT_VALS):
    """
    Expects an input list which maps to the following indexes:
        0: user_id
        1: context(course)_id
        2: request_month
        3: user_agent_type

    :param line: A list of values.
    :return: A list
    """
    found = False
    for key, value in user_vals.items():
        if key in line[3]:
            found = True
            line[3] = value
    if not found:
        line[3] = 'other_unknown'
    # Retrieves the session_id count from the id_data dictionary using
    # the user_id as the key.
    session_count = id_data[str(line[0])]
    line.append(session_count)
    line.extend(config3.ETL_LIST)
    return [str(item) for item in line]

我当前的代码在dataframe 中有数据,我不确定如何最有效地执行上述操作。我知道它们是不可变的,所以它需要作为一个新的数据框返回,但我的问题是如何最好地做到这一点。这是我的代码:

from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import current_date, date_format, lit, StringType

EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()

USER_AGENT_VALS = 
    'CanvasAPI': 'api',
    'candroid': 'mobile_app_android',
    'iCanvas': 'mobile_app_ios',
    'CanvasKit': 'mobile_app_ios',
    'Windows NT': 'desktop',
    'MacBook': 'desktop',
    'iPhone': 'mobile',
    'iPod Touch': 'mobile',
    'iPad': 'mobile',
    'iOS': 'mobile',
    'CrOS': 'desktop',
    'Android': 'mobile',
    'Linux': 'desktop',
    'Mac OS': 'desktop',
    'Macintosh': 'desktop'


if __name__ == '__main__':
    df = sql_context.read.parquet(
        r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
        r'usage_data.gz.parquet')

    course_data = df.filter(df['context_type'] == 'Course')
    request_data = df.select(
        df['user_id'],
        df['context_id'].alias('course_id'),
        date_format(df['request_timestamp'], 'MM').alias('request_month'),
        df['user_agent']
    )

    sesh_id_data = df.groupBy('user_id').count()

    joined_data = request_data.join(
        sesh_id_data,
        on=request_data['user_id'] == sesh_id_data['user_id']
    ).drop(sesh_id_data['user_id'])

    all_fields = joined_data.withColumn(
        'etl_requests_usage', lit('DEV')
    ).withColumn(
        'etl_datetime_local', current_date()
    ).withColumn(
        'etl_transformation_name', lit('agg_canvas_logs_user_agent_types')
    ).withColumn(
        'etl_pdi_version', lit(r'Apache Spark')
    ).withColumn(
        'etl_pdi_build_version', lit(r'1.6.1')
    ).withColumn(
        'etl_pdi_hostname', lit(r'N/A')
    ).withColumn(
        'etl_pdi_ipaddress', lit(r'N/A')
    ).withColumn(
        'etl_checksum_md5', lit(r'N/A')
    )

作为一个 PS,有没有比我做的更好的添加列的方法?

【问题讨论】:

【参考方案1】:

如果你愿意,你甚至可以直接使用 SQL 表达式:

expr = """
    CASE
        WHEN user_agent LIKE \'%Android%\' THEN \'mobile\'
        WHEN user_agent LIKE \'%Linux%\' THEN \'desktop\'
        ELSE \'other_unknown\'
    END AS user_agent_type"""

df = sc.parallelize([
    (1, "Android"), (2, "Linux"), (3, "Foo")
]).toDF(["id", "user_agent"])

df.selectExpr("*", expr).show()
## +---+----------+---------------+
## | id|user_agent|user_agent_type|
## +---+----------+---------------+
## |  1|   Android|         mobile|
## |  2|     Linux|        desktop|
## |  3|       Foo|  other_unknown|
## +---+----------+---------------+

否则你可以用whenlikeotherwise的组合替换它:

from pyspark.sql.functions import col, when
from functools import reduce

c = col("user_agent")
vs = [("Android", "mobile"), ("Linux", "desktop")]
expr = reduce(
    lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc), 
    vs, 
    "other_unknown"
).alias("user_agent_type")

df.select("*", expr).show()

## +---+----------+---------------+
## | id|user_agent|user_agent_type|
## +---+----------+---------------+
## |  1|   Android|         mobile|
## |  2|     Linux|        desktop|
## |  3|       Foo|  other_unknown|
## +---+----------+---------------+

您还可以在单​​个select 中添加多个列:

exprs = [c.alias(a) for (a, c) in [
  ('etl_requests_usage', lit('DEV')), 
  ('etl_datetime_local', current_date())]]

df.select("*", *exprs)

【讨论】:

印象深刻,我忘了我可以直接使用SQL。我不确定 Spark SQL 与我习惯使用的 PostGRESql 方言有多相似。 HiveQL 不是 ANSI SQL,但它足够接近。每当您使用不是 Postgres 特定扩展的东西时,它应该可以正常工作。我不会过度使用,但有时它比编写表达式要简洁得多。 reduce 声明中的类似内容来自哪里?我在pysparkfunctools 中都找不到文档。 @flybonzai Column.like 这个答案太棒了。如果将其添加到 API 中会很酷。

以上是关于如何使用 Apache Spark Dataframes (Python) 执行 Switch 语句的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 TestContainers 创建 apache spark 独立集群以进行集成测试?

Apache Spark 如何在内存中工作?

如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?

Spark SQL读写方法

如何使用 apache spark 访问从 impala 创建的 apache kudu 表

如何在 Apache Spark 中向 Kryo Serializer 注册类?