怎样给Spark传递函数

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎样给Spark传递函数相关的知识,希望对你有一定的参考价值。

参考技术A Spark的算子很大程度上是上通过向集群上的驱动程序传递函数来实现的,编写Spark应用的关键就是使用算子(或者称为转换),给Spark传递函数来实现。常用的向Spark传递函数的方式有两种(来自于Spark官方文档,Spark编程指南):
第一种:匿名函数,处理的代码比较少的时候,可以采用匿名函数,直接写在算子里面:

?

1

myrdd.map(x => x+ 1)

第二种:全局单例对象中的静态方法:先定义object对象MyFunctions,以及静态方法:funcOne,然后传递MyFunctions.funcOne给RDD算子。

?

1
2
3
4
5
6
7
8

object MyFunctions

def funcOne(s: String): String = ...



myRdd.map(MyFunctions.funcOne)

在业务员开发中,需要把RDD的引用传递给某一个类的实例的某个方法,传递给RDD的函数,为类实例的实例方法:

?

1
2
3
4
5
6
7

class MyClass

def funcOne(s: String): String = ...

def doStuff(rdd: RDD[String]): RDD[String] = rdd.map(funcOne



在这个例子中,我们定义了一个类MyClass,类的实例方法doStuff中传入了一个RDD,RDD
算子中调用了类的另外一个实例方法funcOne,在我么New 一个MyClass
的实例并调用doStuff的方法的时候,需要讲整个实例对象发给集群,所以类MyClass必须可以序列化,需要extends
Serializable。

相似的,访问方法外部的对象变量也会引用整个对象,需要把整个对象发送到集群:

?

1
2
3
4
5
6

class MyClass

val field = "Hello"

def doStuff(rdd: RDD[String]): RDD[String] = rdd.map(x => field
+ x) <span style="font-size:9pt;line-height:1.5;"></span>

?

1



为了避免整个对象都发送给集群,可以定义一个局部变量来保存外部对象field的引用,这种情况尤其在一些大对象里,可以避免整个对象发送到集群,提高效率。

?

1
2
3
4
5
6
7

def doStuff(rdd: RDD[String]): RDD[String] =

val field_ = this.field

rdd.map(x => field_ + x)



Spark应用最终是要在集群中运行的,许多问题在单一的本地环境中无法暴露出来,有时候经常会遇到本地运行结果和集群运行结果不一致的问题,这就要求开
发的时候多使用函数式编程风格,尽量使的写的函数都为纯函数。纯函数的好处是:无状态,线程安全,不需要线程同步,应用程序或者运行环境
(Runtime)可以对纯函数的运算结果进行缓存,运算加快速度。

那么什么是纯函数了?

纯函数(Pure Function)是这样一种函数——输入输出数据流全是显式(Explicit)的。显式(Explicit)
的意思是,函数与外界交换数据只有一个唯一渠道——参数和返回值;函数从函数外部接受的所有输入信息都通过参数传递到该函数内部;函数输出到函数外部的所
有信息都通过返回值传递到该函数外部。如果一个函数通过隐式(Implicit)方式,从外界获取数据,或者向外部输出数据,那么,该函数就不是纯函数,
叫作非纯函数(Impure Function)。隐式(Implicit)的意思是,函数通过参数和返回值以外的渠道,和外界进行数据交换。比如,读取全局变量,修改全局变量,都叫作以隐式的方式和外界进行数据交换;比如,利用I/O API(输入输出系统函数库)读取配置文件,或者输出到文件,打印到屏幕,都叫做隐式的方式和外界进行数据交换。

在计算过程中涉及到对象的交互时,尽量选用无状态的对象,比如对于一个bean,成员变量都为val的,在需要数据交互的地方new 一个新的。

关于(commutative and associative)交换律和结合律。在传递给reudce,reduceByKey,以及其他的一些merge,聚合的操作中的函数必须要满足交换律和结合律,交换律和结合律就是我们数学上学过的:

a + b = b + a,a + b + c = a + (b + c)

定义的函数func(a,b)和f(b,a)应该得到相同的结果,f(f(a,b),c)和f(a,f(b,c))应该得到相同的结果。

最后说一下广播变量和累加器的使用。在程序中不要定义一个全局的变量,如果需要在多个节点共享一个数据,可以采用广播变量的方法。如果需要一些全局的聚合计算,可以使用累加器。

如何将函数指针传递给成员函数c++?

【中文标题】如何将函数指针传递给成员函数c++?【英文标题】:How to pass function pointer to member function c++? 【发布时间】:2017-10-29 13:31:01 【问题描述】:

所以这就像我关于该主题的第二个问题,但这次我想知道如果我传递一个 int 类型的函数会怎样。就像这里我想将 fun1 的输出存储在主函数中的变量“int my”中.我应该如何为此编写包装函数?

#include<iostream>
using namespace std;

class student

public:
    int fun1(int m) 
     
        return 2*m;
    

    int wrapper(int (student::*fun)(int k))
    
        (this->*fun)(int k)
    
;

int main()
   
    student s;
    int l=5;
    int my=s.wrapper(&student::fun1(l));
    cout << m << endl;
    return 0;

【问题讨论】:

你想用这个包装函数实现什么是有问题的。 完全错误。 (this-&gt;*fun)(int k) 不是有效的 C++,@user0042。 @user0042 出现错误 :( @codie 考虑阅读错误信息。它通常会告诉您出了什么问题。 (this-&gt;*fun)(int k) 不是有效的 C++。这是一个函数调用,与任何其他函数调用没有什么不同,只是该函数是通过方法指针调用的。函数调用有值作为参数。例如,要将值 2 传递给采用 int 参数的函数,您只需指定值 2。而不是 int 2 【参考方案1】:

包装器调用者需要两个参数: 一个用于调用的函数,一个用于 调用它的参数

int wrapper(int (student::*fun)(int), int k)

    return (this->*fun)(k);

这样称呼:

int my = s.wrapper(&student::fun1, 1);
cout << my << endl;

看这里http://coliru.stacked-crooked.com/a/cd93094c38bfa591

【讨论】:

【参考方案2】:

当标准库中提供了mem_fn() 时,为什么还要创建自己的包装器:

int my = mem_fn(&student::fun1)(&s,l);
cout<<my<<endl;

Online demo

【讨论】:

以上是关于怎样给Spark传递函数的主要内容,如果未能解决你的问题,请参考以下文章

spark程序里如果给rdd.map传递一个会返回null的函数,最后rdd里面是会少一个元素还是有为null的元素啊

在Delphi中如何使用SQL自定义函数,参数怎样传递给自定义函数?

[react] 怎样将事件传递给子组件?

.js 文件怎样接受html 传递的参数,html 怎样传递参数给.js文件

Spark SQL 传递变量 - Synapse(Spark 池)

怎样向父页面script传递变量