(Django)气流中的 ORM - 有可能吗?

Posted

技术标签:

【中文标题】(Django)气流中的 ORM - 有可能吗?【英文标题】:(Django) ORM in airflow - is it possible? 【发布时间】:2018-05-08 06:57:37 【问题描述】:

如何在 Airflow 任务中使用 Django 模型?

根据 Airflow 官方文档,Airflow 提供了与数据库交互的钩子(如 mysqlHook / PostgresHook / 等),以后可以在 Operators 中使用这些钩子来执行行查询。附上核心代码片段:

从https://airflow.apache.org/_modules/mysql_hook.html复制

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = 
            "user": conn.login,
            "passwd": conn.password or ''
        
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn

从https://airflow.apache.org/_modules/mysql_operator.html复制

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)

我们可以看到,Hook 封装了连接配置,而 Operator 提供了执行自定义查询的能力。

问题:

使用不同的 ORM 来获取和处理数据库对象而不是原始 SQL 非常方便,原因如下:

    在简单的情况下,ORM 可能是一个更方便的解决方案,请参阅ORM definitions。 假设已经建立了像 Django 这样的系统,其中定义了模型及其方法。每次这些模型的模式发生变化时,都需要重写气流原始 SQL 查询。 ORM 为使用此类模型提供了统一的界面。

由于某种原因,没有在 Airflow 任务中使用 ORM 的示例,就挂钩和操作符而言。根据Using Django database layer outside of Django? 问题,需要设置到数据库的连接配置,然后直接在ORM 中执行查询,但是在适当的钩子/运算符之外执行此操作会破坏Airflow principles。这就像使用 "python work_with_django_models.py" 命令调用 BashOperator。

最后,我们想要这个:

那么在这种情况下,最好的做法是什么?我们是否共享 Django ORM / 其他 ORM 的任何钩子 / 运算符?为了使以下代码真实(视为伪代码!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')

而不是在原始 SQL 中实现此功能。

我认为这是一个非常重要的话题,因为在这种情况下,整个基于 ORM 的框架和流程都无法深入研究 Airflow。

提前致谢!

【问题讨论】:

我可以看到在 Airflow 中访问 Django 模型的便利性。但如果可能,我会主张在 Django 中保留该逻辑,然后通过可能通过 BashOperator 调用的管理命令公开该逻辑。如果需要,它将更容易独立测试该代码并在 Airflow 之外运行它。但是根据您的用例,我可以看到这变得不必要的多毛。还有一个缺点是异常处理不是那么简单。 【参考方案1】:

我同意我们应该继续讨论,因为访问 Django ORM 可以显着降低解决方案的复杂性。

我的方法是 1) 创建一个 DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()

和 2) 为逻辑 / 运算符扩展 DjangoOperator 可以从访问 ORM 中受益

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()

使用此策略,您可以区分使用原始 SQL / ORM 的运算符。另请注意,对于 Django 运算符,所有 django 模型导入都需要在执行上下文中,如上所示。

【讨论】:

太好了,谢谢你的例子!您对生产中的这种方法有任何反馈吗?以这种运算符的形式在气流任务中使用 ORM 对您有用吗?您还建议覆盖哪些其他方法,例如“pre_execute”? "pre_execute" 是我需要覆盖的所有内容,因为它在执行和初始化 django 之前触发。我会说这对我的任务很有帮助......我必须遍历嵌套对象并在各种表中创建对象。我的意思是,我敢肯定有些人认为所有这些事情都应该通过 PostgresHook 在原始 SQL 中完成,但是在这种情况下,这将是一个非常复杂的语句,在我看来更难维护。 @RyanStack 感谢您的回复。如何设置项目或路径,以便气流可以读取应用设置文件?

以上是关于(Django)气流中的 ORM - 有可能吗?的主要内容,如果未能解决你的问题,请参考以下文章

这里有 Django ORM 中的 in 运算符吗? [复制]

Django ORM - 我可以使用 ID 以外的其他列创建具有外键的新对象吗?

如何在 Django ORM 中映射 PostgreSQL 数组字段

Django框架 之 ORM中介模型

Django:我们可以使用 django ORM 将 dict 直接加载到数据库吗

动态选择 django orm 列