用 luigi 任务替换表加载功能

Posted

技术标签:

【中文标题】用 luigi 任务替换表加载功能【英文标题】:Replacing a table load function with a luigi task 【发布时间】:2017-08-06 14:42:22 【问题描述】:

我有一个 python 函数,可以将数据从其他 2 个表加载到 sql server 表中。

def load_table(date1,date2):
    strDate1 = date1.strftime('%m/%d/%Y')
    strDate2 = date2.strftime('%m/%d/%Y')
    stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
    cnx1= connection string
    self.curs=cnx1.cursor()
    self.curs.execute(stmt)
    self.curs.commit()

我正在尝试将此函数转换为 luigi 任务

按照文档尝试了以下方法:

class Datetask(luigi.Task):
    def output(self):
        return luigi.DateParameter()

class loading(luigi.Task):
    def requires(self):
        return 'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))

    def run(self):
       date1 = dict['date1']

    date2 = dict['date2']
    strDate1 = date1.strftime('%m/%d/%Y')
    strDate2 = date2.strftime('%m/%d/%Y')
    stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
    curs=cnx1.cursor()
    curs.execute(stmt)
    curs.commit()
    curs.close()

当我尝试运行它时,我得到了错误:

    python -m luigi --module load_task loading  --local-scheduler
    DEBUG: Checking if loading() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:305: UserWarning: Task loading() without outputs has no custom complete() method
  is_complete = task.complete()
WARNING: Will not run loading() or any dependencies due to error in deps() method:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 697, in _add
    deps = task.deps()
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 572, in deps
    return flatten(self._requires())
  File "/usr/local/lib/python2.7/dist-packages/luigi/task.py", line 544, in _requires
    return flatten(self.requires())  # base impl
  File "load_task.py", line 19, in requires
    return 'date1': DateTask(dt.date(2016,10,30)), 'date2': DateTask(dt.date(2016,11,29))
NameError: global name 'DateTask' is not defined

我正在定义 DateTask,所以这个错误让我很困惑。

另外,是否所有任务都需要全部 3 个requires()runoutput

另外,是否有必要始终将输出写入文件? 在使用 luIgi 时是全新的,因此将不胜感激任何输入

【问题讨论】:

您的任务类定义中有错字:应该是DateTask 而不是Datetask(t 必须是大写!) 除此之外,DateTask 的方法输出必须返回luigi.Target 的子类而不是参数。 ohh..typo 是愚蠢的..我确实读过有关使用 luigi.Target 将其写入本地文本文件的信息。没有其他方法可以访问 date1 和 date2 吗? 【参考方案1】:

我认为这样的做法会更好:

class LoadTask(luigi.Task):
    date1 = luigi.DateParameter()
    date2 = luigi.DateParameter()        

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget("0-1.txt".format(self.date1, self.date2))

    def run(self):
        strDate1 = self.date1.strftime('%m/%d/%Y')
        strDate2 = self.date2.strftime('%m/%d/%Y')
        stmt = "insert into Agent_Queue (ID)  select distinct Send_Location_ID from Pretty_Txns where Send_Date >= '%s' and Send_Date <= '%s' and Send_Location_ID is not null union select distinct Pay_Location_ID  from Pretty_Txns where Pay_Date >= '%s' and Pay_Date <= '%s' and Pay_Location_ID is not null" % (strDate1,strDate2,strDate1,strDate2)
        curs=cnx1.cursor()
        curs.execute(stmt)
        curs.commit()
        curs.close()
        with self.output().open('w') as out_file:
            print >> out_file, strDate1, strDate2

调用:

luigi --module load_task LoadTask --date1 2017-01-01 --date2 2017-01-02 --local-scheduler

另外,是否所有任务都需要全部 3 个 requires(),run,output?

是的。虽然有一些任务类型,比如luigi.WrapperTask,不需要output(),如果你是链中的第一个任务,你可以从requires()返回None等等。

另外,是否有必要始终将输出写入文件?

没有。例如,SQL Alchemy contrib 模块定义了一个 Target 子类,您可以将其用作数据库中的目标。 http://luigi.readthedocs.io/en/stable/api/luigi.contrib.sqla.html

【讨论】:

以上是关于用 luigi 任务替换表加载功能的主要内容,如果未能解决你的问题,请参考以下文章

[原]排错实战——使用process explorer替换任务管理器

wps怎么用替换功能

用jQuery替换内容会导致IE8中的内存泄漏吗?

vba用函数替换字符串

当任务依赖关系过期时,luigi 可以重新运行任务吗?

在运行()中产生任务时Luigi中的TaskClassAmbigiousException