spark pipeline原理学习和记录

• DataFrame:ML API使用Sark SQL中的DataFrme作为机器学习数据集,可容纳各种类型的数据,如DataFrame可能是存储文本的不同列,特征向量,真正的标签或者预测。      
• 转换器:Transformer是一种算法,可以将一个DataFrame转换成另一个DataFrame。如机器学习模型是一个转换器,可以将特征向量的DataFrame转换成预测结果的DataFrame。
• 预测器:一个预测是一个算法,可以基于DataFrame产出一个转换器。如机器学习算法是一种预测,训练DataFrame并产生一个模型。      
• 管道/工作流:管道链接多个转换器和预测器生成一个机器学习工作流。     
• 参数:所有的转换器和预测器共享一个通用的API指定参数。
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.

• DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
• Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
• Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
• Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
• Parameter: All Transformers and Estimators now share a common API for specifying parameters.


机器学习可以处理多种类型的数据,比如矢量/文本/图像和结构化数据,这里DataFrame API源于Spark SQL,主要用来处理各种类型的数据。
Machine learning can be applied to a wide variety of data types, such as vectors, text, images, and structured data. This API adopts the DataFrame from Spark SQL in order to support a variety of data types.
DataFrame supports many basic and structured types; see the Spark SQL datatype reference for a list of supported types. In addition to the types listed in the Spark SQL guide, DataFrame can use ML Vector types.
A DataFrame can be created either implicitly or explicitly from a regular RDD. See the code examples below and the Spark SQL programming guide for examples.
Columns in a DataFrame are named. The code examples below use names such as “text,” “features,” and “label.”
Pipeline components




(1) 一个特征转换对于DataFrame的操作可能为,读取一个列,将他映射成为一个新的列,然后输出一个新的DataFrame。
(2) 一个模型学习对于DataFrame的操作可能为,读取包含一组特征的列,然后预测结果,最后将添加预测结果的新数据输出为一个DataFrame。


A Transformer is an abstraction that includes feature transformers and learned models. Technically, a Transformer,implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:
• A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
• A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.



An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.

Properties of pipeline components


Transformer.transform()s and are both stateless. In the future, stateful algorithms may be supported via alternative concepts.
Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters (discussed below).


(1) 文档分词
(2) 每个词转成向量
(3) 使用向量和标签学习一个预测模型
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:
• Split each document’s text into words.
• Convert each document’s words into a numerical feature vector.
• Learn a prediction model using the feature vectors and labels.
MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.

How it works


A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer(which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.
We illustrate this for the simple text document workflow. The figure below is for the training time usage of a Pipeline.

上图中,第一行表示一个Pipleline,包含三个阶段。其中Tokenizer和HashingTF是转换,第三个LogiticRegression逻辑回归是预测。下面一行表示数据流向,(1)对于Raw text文本数据和标签生成DataFrame,然后调用Pipeline的fit接口;(2)调用Tokenizer的transform接口将文本进行分词,并将分词添加到DataFrame;(3)pipleline调用LogiticRegression.fit产出一个LogiticRegressionModel。


Above, the top row represents a Pipeline with three stages. The first two (Tokenizer and HashingTF) are Transformers (blue), and the third (LogisticRegression) is an Estimator (red). The bottom row represents data flowing through the pipeline, where cylinders indicate DataFrames. The method is called on the original DataFrame, which has raw text documents and labels. The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words to the DataFrame. The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the DataFrame. Now, since LogisticRegression is an Estimator, the Pipeline first calls to produce a LogisticRegressionModel. If the Pipeline had more stages, it would call the LogisticRegressionModel’s transform() method on the DataFrame before passing the DataFrame to the next stage.
A Pipeline is an Estimator. Thus, after a Pipeline’s fit() method runs, it produces a PipelineModel, which is a Transformer. This PipelineModel is used at test time; the figure below illustrates this usage.

In the figure above, the PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have become Transformers. When the PipelineModel’s transform() method is called on a test dataset, the data are passed through the fitted pipeline in order. Each stage’s transform() method updates the dataset and passes it to the next stage.
Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.



DAG Pipelines: A Pipeline’s stages are specified as an ordered array. The examples given here are all for linear Pipelines, i.e., Pipelines in which each stage uses data produced by the previous stage. It is possible to create non-linear Pipelines as long as the data flow graph forms a Directed Acyclic Graph (DAG). This graph is currently specified implicitly based on the input and output column names of each stage (generally specified as parameters). If the Pipeline forms a DAG, then the stages must be specified in topological order.
Runtime checking: Since Pipelines can operate on DataFrames with varied types, they cannot use compile-time type checking. Pipelines and PipelineModels instead do runtime checking before actually running the Pipeline. This type checking is done using the DataFrame schema, a description of the data types of columns in the DataFrame.
Unique Pipeline stages: A Pipeline’s stages should be unique instances. E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs. However, different instances myHashingTF1 and myHashingTF2 (both of type HashingTF) can be put into the same Pipeline since different instances will be created with different IDs.


(1) 为实例设置参数。
(2) 将参数封装到ParamMap,然后解析ParamMap为fit和transform传递参数。ParamMap的参数将覆盖原始的参数。

MLlib Estimators and Transformers use a uniform API for specifying parameters.
A Param is a named parameter with self-contained documentation. A ParamMap is a set of (parameter, value) pairs.
There are two main ways to pass parameters to an algorithm:
1. Set parameters for an instance. E.g., if lr is an instance of LogisticRegression, one could call lr.setMaxIter(10) to make use at most 10 iterations. This API resembles the API used in spark.mllib package.
2. Pass a ParamMap to fit() or transform(). Any parameters in the ParamMap will override parameters previously specified via setter methods.
Parameters belong to specific instances of Estimators and Transformers. For example, if we have two LogisticRegressioninstances lr1 and lr2, then we can build a ParamMap with both maxIter parameters specified: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20). This is useful if there are two algorithms with the maxIter parameter in a Pipeline.

Saving and Loading Pipelines

Often times it is worth it to save a model or a pipeline to disk for later use. In Spark 1.6, a model import/export functionality was added to the Pipeline API. Most basic transformers are supported as well as some of the more basic ML models. Please refer to the algorithm’s API documentation to see if saving and loading is supported.

