在 pyspark 中同时而不是按顺序运行 for 循环

Posted

技术标签:

【中文标题】在 pyspark 中同时而不是按顺序运行 for 循环【英文标题】:Run a for loop concurrently and not sequentially in pyspark 【发布时间】:2020-06-04 12:08:05 【问题描述】:

下面是我在 Databricks 集群上运行的 for 循环 执行:

datalake_spark_dataframe_downsampled = pd.DataFrame( 
                           'IMEI' : ['001', '001', '001', '001', '001', '002', '002'],
                            'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826],
                            'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70],
                            'DaysDeploymentDate': [0, 0, 1, 1, 1, 1, 1],
                            'label': [0, 0, 1, 1, 0, 0, ]
                           )
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )

# printSchema of the datalake_spark_dataframe_downsampled (spark df):

"root
 |-- IMEI: string (nullable = true)
 |-- OuterSensorConnected: integer (nullable = false)
 |-- OuterHumidity: float (nullable = true)
 |-- EnergyConsumption: float (nullable = true)
 |-- DaysDeploymentDate: integer (nullable = true)
 |-- label: integer (nullable = false)"

device_ids=datalake_spark_dataframe_downsampled.select(sql_function.collect_set('IMEI').alias('unique_IMEIS')).collect()[0]['unique_IMEIS']

print(device_ids) #["001", "002", ..."030"] 30 ids

for i in device_ids:

  #filtered_dataset=datalake_spark_dataframe_downsampled.where(datalake_spark_dataframe_downsampled.IMEI.isin([i])) 
  #The above operation is executed inside the function training_models_operation_testing()

  try:
      training_models_operation_testing(i, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training, training_split_ratio_value, testing_split_ratio_value, mlflow_folder, cross_validation_rounds_value, features_column_name, optimization_metric_value, pretrained_models_T_minus_one, folder_name_T_minus_one, timestamp_snap, instrumentation_key_value, canditate_asset_ids, executor, device_ids)

  except Exception as e:
      custom_logging_function("ERROR", instrumentation_key_value, "ERROR EXCEPTION: 0".format(e))

为了解决这个问题,我附上了一个示例数据,以大致了解我的数据是怎样的......并想象存在更多的行和 ID。我刚刚创建了一些只是为了演示

如您所见,这是在使用 pyspark 运行的 Databricks 集群中的 for 循环内的一个简单函数调用。

简而言之,我首先创建一个包含在我的数据集中存在的唯一 ID(IMEI 列)的列表。这等于 30。因此,我使用 for 循环运行 30 次迭代。在每次迭代中,我都执行以下步骤:

过滤与 30 个资产 ID 中的每一个匹配的 datalake_spark_dataframe_downsampled (spark df) 行。例如,假设在初始 df 的 40,000 行中,只有 140 行对应于第一个设备 ID。 根据这 140 行 (filtered_dataset),该函数会执行 preprocessingtrain-test-split 并训练两个 Spark ML 算法,仅用于过滤数据集的行。

附加的代码 sn-p 运行成功。虽然 for 循环 是按顺序执行的,但一次迭代一次。该函数为第一个 id 调用,只有在完成后才转到下一个 id。但是,我想要转换上述 for 循环,使 30 次迭代将在 pyspark 中同时运行,而不是 一个接一个。我如何在 pyspark 中实现这一点?

我愿意进行讨论和想法测试,因为我知道我所要求的可能不是那么简单,无法在 Spark 环境中执行。

我当前的日志输出(这是我在下面打印的内容)

迭代 1 开始执行... - 执行 id 001 的函数 执行完毕...

迭代 2 开始执行... - 执行 id 002 的函数 执行完毕...

我想要的日志输出(这是我在下面打印的内容)

开始执行... - 执行 id 001 的函数 - 执行 id 002 的函数 - 执行 id 003 的函数 - 执行 id 004 的函数 。 . . . - 执行 id 030 的函数 执行完毕...

同时(同时)一次

[更新]基于cmets(线程模块)上的答案:

【问题讨论】:

这个可以,需要实现多线程 @RohitNimmala 你能否提供一个在线指南或多线程的火花实现? ...我猜你的意思是像这个例子docs.python.org/3/library/concurrent.futures.html 在答案部分提供了一个示例,因为它不适合 cmets 部分,希望对您有所帮助。 【参考方案1】:

“for循环”是线性执行/顺序执行,可以认为是单线程执行。

如果你想同时运行你的代码,你需要创建多个线程/进程来执行你的代码。

下面是实现多线程的例子。我没有测试代码,但应该可以工作:)

#importing threading library

import threading

# Creating a list of threads
thread_list = []

#looping all objects, creating a thread for each element in the loop, and append them to thread_list
for items in device_ids:
    thread = threading.Thread(target=training_models_operation_testing,args=(items, datalake_spark_dataframe_downsampled, drop_columns_not_used_in_training,
                                                   training_split_ratio_value, testing_split_ratio_value, mlflow_folder,
                                                   cross_validation_rounds_value, features_column_name,
                                                   optimization_metric_value, pretrained_models_T_minus_one,
                                                   folder_name_T_minus_one, timestamp_snap, instrumentation_key_value,
                                                   canditate_asset_ids, executor, device_ids,))
    thread_list.append(thread)

#Start multi threaded exucution
for thread in thread_list:
    thread.start()

#Wait for all threads to finish
for thread in thread_list:
    thread.join()

print("Finished executing all threads")

【讨论】:

这应该没问题,除了并发性之外,它与for循环非常相似。 这无济于事,因为线程是 python 不是基于纯并发,因为它实现了 GIL。你应该坚持并发未来或多处理这个任务,因为线程是顺序 BTS。 @ShubhamJain 好的,我会测试两种方式。一旦我的 Databricks 集群启动。因此,根据您的意见,如果您可以根据我的问题也使用 concurrent.futures 或 multiprocessing 发布示例,我应该看起来像 docs.python.org/3/library/concurrent.futures.html....It 会帮助我。无论如何,如果我发现一些无法管理的东西,我会弄脏我的手,我会带着结果回来。 @ShubhamJain 请检查我的问题的屏幕截图更新。线程模块运行良好......我已经用 concurrent.futures 测试了相反的执行,但我无法像 Rohit 的方法那样实现并行性......所以除非你提供一个有效的答案,其中 Concurrent.Futures 优于线程答案Rohit 的,对我来说,Rohit 给出了一个有效的答案。 @NikSp 可以分批(队列),分批运行,参考以下链接:pymotw.com/2/Queue

以上是关于在 pyspark 中同时而不是按顺序运行 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

如何按顺序而不是同时运行gatling场景?

golang for循环取值为啥不按顺序输出?

重用pyspark缓存并在for循环中不持久

在 for 循环中使用 udf 在 Pyspark 中创建多个列

强制异步任务按顺序运行

HTML5 File API 按顺序包含多个文件,而不是一次全部