dremio Operator 简单说明

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dremio Operator 简单说明相关的知识,希望对你有一定的参考价值。

来自官方的一小段介绍

Operator 是执行的基础单元,Operator 对于一个或者多个input 数据 stream 进行消费,然后输出,他们是单线程的
同时包含了不同类型的Operator 同时也会包含不同的状态,但是所有的状态会通过一系列的MasterStates进行描述
主要包含四类operators :

  • DualInputOperator 生产者,比如HashJoinOperator、UnionOperator
  • SingleInputOperator 生产者,单一消费者,比如FilterOperator, SortOperator
  • ProducerOperator 生产者,比如ScanOperator, UnorderedReceiverOperator
  • TerminalOperator 单一消费者,比如SingleSenderOperator, ScreenOperator
    dremio 的pipeline 是使用到Operator,Operator 会通过服务发现的能力在启动的时候注册(OperatorCreatorRegistry)

ProducerOperator 接口定义

public interface ProducerOperator extends Producer 

enum State implements OperatorState<State>
NEEDS_SETUP(MasterState.NEEDS_SETUP),
CAN_PRODUCE(MasterState.CAN_PRODUCE),
BLOCKED(MasterState.BLOCKED),
DONE(MasterState.DONE);

final MasterState master;
State(MasterState master)
this.master = master;


@Override
public void is(State expected)
assert expected == this : String.format(Operator.ERROR_STRING, expected.name(), this.name());


@Override
public MasterState getMasterState()
return master;



ProducerOperator.State getState();

/**
* Setups operator. Can only be called once. SqlOperatorImpl must be in NEEDS_SETUP state.
* @return The VectorAccessible to be used for result return.
*/
VectorAccessible setup() throws Exception;

// 此次使用了类似guice 的注入助手能力,方便ProducerOperator的创建,具体参考 https://www.com/rongfengliang/p/16748323.html
interface Creator<T extends PhysicalOperator>
ProducerOperator create(FragmentExecutionContext fragmentExecContext, OperatorContext context, T config) throws ExecutionSetupException;


interface ReceiverCreator<T extends PhysicalOperator>
ProducerOperator create(BatchStreamProvider streams, OperatorContext context, T config) throws ExecutionSetupException;


发现加载(自己编写的,如果使用guice 会更加方便的)

private <T> ImmutableMap<Class<?>, T> getImplementors(ScanResult scanResult, Class<T> baseInterface)
final Map<Class<?>, T> map = new HashMap<>();

Set<Class<? extends T>> providerClasses = scanResult.getImplementations(baseInterface);
for (Class<?> c : providerClasses)
Class<?> operatorClass = c;
boolean interfaceFound = false;
while (!interfaceFound && !(c.equals(java.lang.Object.class)))
final Type[] ifaces = c.getGenericInterfaces(); // never returns null
for (Type iface : ifaces)
if (!(iface instanceof ParameterizedType && ((ParameterizedType) iface).getRawType().equals(baseInterface)))
continue;

final Type[] args = ((ParameterizedType) iface).getActualTypeArguments();
interfaceFound = true;
boolean constructorFound = false;
for (Constructor<?> constructor : operatorClass.getConstructors())
Class<?>[] params = constructor.getParameterTypes();
if (params.length == 0)
try
T newInstance = (T) constructor.newInstance();
Object old = map.put((Class<?>) args[0], newInstance);
if (old != null)
throw UserException.functionError()
.message("Duplicate OperatorCreator [%s, %s] found for PhysicalOperator %s",
old.getClass().getCanonicalName(), operatorClass.getCanonicalName(),
((Class<?>) args[0]).getCanonicalName())
.build(logger);

constructorFound = true;
catch (Exception ex)
logger.warn("Failure while creating OperatorCreator. Constructor declaring class .",
constructor.getDeclaringClass().getName(), ex);



if (!constructorFound)
logger.debug("Skipping registration of OperatorCreator as it doesnt have a default constructor",
operatorClass.getCanonicalName());


c = c.getSuperclass();


return ImmutableMap.copyOf(map);

参考实现类

dremio

 

 

pipeline 使用Operator

FragmentExecutor.java

pipeline = PipelineCreator.get(
new FragmentExecutionContext(major.getForeman(), sources, cancelled, major.getContext()),
buffers,
opCreator,
contextCreator,
functionLookupContextToUse,
rootOperator,
tunnelProvider,
new SharedResourcesContextImpl(sharedResources)
);

pipeline.setup();

jdbc 对于ProducerOperator 的使用

public class JdbcBatchCreator implements ProducerOperator.Creator<JdbcSubScan> 
public JdbcBatchCreator()


public ProducerOperator create(FragmentExecutionContext fragmentExecContext, OperatorContext context, JdbcSubScan subScan) throws ExecutionSetupException
// 每个scan 执行会包含使用的存储插件,通过此将schema fetcher,reader 都关联起来了,我们基本可以看到,这个是一个插件必备的一个点
// jdbc plugin
JdbcStoragePlugin plugin = (JdbcStoragePlugin)fragmentExecContext.getStoragePlugin(subScan.getPluginId());
// schemfecher
JdbcSchemaFetcherImpl schemaFetcher = (JdbcSchemaFetcherImpl)plugin.getFetcher();
JdbcPluginConfig config = plugin.getConfig();
// reader
JdbcRecordReader innerReader = new JdbcRecordReader(context, schemaFetcher.getSource(), subScan.getSql(), config, subScan.getColumns(), fragmentExecContext.cancelled(), subScan.getPluginId().getCapabilities(), plugin.getDialect().getDataTypeMapper(config), subScan.getReferencedTables(), subScan.getSkippedColumns());
CoercionReader reader = new CoercionReader(context, subScan.getColumns(), innerReader, subScan.getFullSchema());
// 使用ScanOperator
return new ScanOperator(subScan, context, RecordReaderIterator.from(reader));

一个参考调用链

以前分享过,执行的时候会依赖创建的Operator

dremio

 

 

MasterState 状态定义

Operator.java

  public enum MasterState 
NEEDS_SETUP,
CAN_CONSUME,
CAN_CONSUME_L,
CAN_CONSUME_R,
CAN_PRODUCE,
BLOCKED,
DONE

interface OperatorState<T>
MasterState getMasterState();
void is(T expected);
String name();

 具体每个Operator 会包含自己的OperatorState 实现

说明

dremio Operator 是一个比较重要的东西,通过了解Operator 的大致流程,有助于我们更好的学习dremio 同时了解内部的机制(比如jdbc 插件的执行就依赖此能力,扩展了dremio 可以直接查询关系数据库,mongo 也类似)以上是一个简单的说明,详细的大家可以深入学习下

参考资料

​https://github.com/dremio/dremio-oss/blob/d41cb52143b6b0289fc8ed4d970bfcf410a669e8/sabot/kernel/src/main/java/com/dremio/sabot/driver/PipelineCreator.java​​​
​​​https://github.com/dremio/dremio-oss/blob/d41cb52143b6b0289fc8ed4d970bfcf410a669e8/sabot/kernel/src/main/java/com/dremio/sabot/driver/OperatorCreatorRegistry.java​​​
​​​https://github.com/dremio/dremio-oss/blob/d41cb52143b6b0289fc8ed4d970bfcf410a669e8/sabot/kernel/src/main/java/com/dremio/sabot/op/spi/Operator.java​

以上是关于dremio Operator 简单说明的主要内容,如果未能解决你的问题,请参考以下文章

dremio ClusterCoordinator 服务简单说明

dremio 17 混合类型的问题

typesafe config 简单试用

Dremio:使数据分析民主化

数据湖引擎-dremio-白话数据架构

基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖