气流插件未正确拾取
Posted
技术标签:
【中文标题】气流插件未正确拾取【英文标题】:airflow plugins not getting picked up correctly 【发布时间】:2018-09-27 18:13:12 【问题描述】:我们使用的是 Apache 1.9.0。我写了一个雪花钩插件。我已将钩子放在 $AIRFLOW_HOME/plugins 目录中。
$AIRFLOW_HOME
+--plugins
+--snowflake_hook2.py
snowflake_hook2.py
# This is the base class for a plugin
from airflow.plugins_manager import AirflowPlugin
# This is necessary to expose the plugin in the Web interface
from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
# This is the base hook for connecting to a database
from airflow.hooks.dbapi_hook import DbApiHook
# This is the snowflake provided Connector
import snowflake.connector
# This is the default python logging package
import logging
class SnowflakeHook2(DbApiHook):
"""
Airflow Hook to communicate with Snowflake
This is implemented as a Plugin
"""
def __init__(self, connname_in='snowflake_default', db_in='default', wh_in='default', schema_in='default'):
logging.info('# Connecting to 0'.format(connname_in))
self.conn_name_attr = 'snowflake_conn_id'
self.connname = connname_in
self.superconn = super().get_connection(self.connname) #gets the values from Airflow
SNIP - Connection stuff that works
self.cur = self.conn.cursor()
def query(self,q,params=None):
"""From jmoney's db_wrapper allows return of a full list of rows(tuples)"""
if params == None: #no Params, so no insertion
self.cur.execute(q)
else: #make the parameter substitution
self.cur.execute(q,params)
self.results = self.cur.fetchall()
self.rowcount = self.cur.rowcount
self.columnnames = [colspec[0] for colspec in self.cur.description]
return self.results
SNIP - Other class functions
class SnowflakePluginClass(AirflowPlugin):
name = "SnowflakePluginModule"
hooks = [SnowflakeHook2]
operators = []
所以我继续在 Airflows plugin_manager 中添加了一些打印语句,以尝试更好地处理正在发生的事情。重新启动网络服务器并运行气流 list_dags 后,这些行显示“新模块名称”(并且没有错误
SnowflakePluginModule [<class '__home__ubuntu__airflow__plugins_snowflake_hook2.SnowflakeHook2'>]
hook_module - airflow.hooks.snowflakepluginmodule
INTEGRATING airflow.hooks.snowflakepluginmodule
snowflakepluginmodule <module 'airflow.hooks.snowflakepluginmodule'>
由于这与文档所说的一致,我应该可以在我的 DAG 中使用它:
from airflow import DAG
from airflow.hooks.snowflakepluginmodule import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
但是网络抛出这个错误
Broken DAG: [/home/ubuntu/airflow/dags/test_sf2.py] No module named 'airflow.hooks.snowflakepluginmodule'
所以问题是,我做错了什么?还是我发现了一个错误?
【问题讨论】:
【参考方案1】:你需要导入如下:
from airflow import DAG
from airflow.hooks import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
或
from airflow import DAG
from airflow.hooks.SnowflakePluginModule import SnowflakeHook2
from airflow.operators.python_operator import PythonOperator
【讨论】:
【参考方案2】:我认为气流不会自动通过插件目录中的文件夹并运行其下的所有内容。我成功设置它的方法是在包含每个插件类的插件目录下有一个__init__.py
。看看 Github 中的 Astronomer 插件,它为如何设置插件提供了一些非常好的示例。
特别是看看他们是如何设置 mysql 插件的
https://github.com/airflow-plugins/mysql_plugin
还有人在您可能想要利用的最新版本之一中加入了雪花钩:
https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/snowflake_hook.py
【讨论】:
以上是关于气流插件未正确拾取的主要内容,如果未能解决你的问题,请参考以下文章
09 Jquery UI Datepicker 日期拾取插件