目前,我编写的机器学习任务是手动运行的。我会下载所需的输入文件,进行学习和预测,然后输出一个.csv文件,最后将该文件复制到数据库中。
然而,由于这将投入生产环境,我需要自动化整个过程。所需的输入文件将每月(将来可能会更频繁)由供应商发送到S3存储桶中。
现在我计划使用Luigi来解决这个问题。理想的流程如下:
- 每周(或每天,或每小时,根据我认为的最佳时间间隔)我的程序需要监控S3存储桶中的新文件
- 当文件到达时,我的机器学习管道将被触发,并生成一些pandas数据框。
- 之后,我需要我的程序将这些结果写入不同的数据库
问题是,我不知道如何使用Luigi来实现自动化:
- 文件监控
- 任务调度(例如,每月一次)
- 以可复现的方式部署
今天,这是我心中的管道框架:
import luigifrom mylib import ml_algorithmfrom mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_doneclass Extract(luigi.Task): date = luigi.DateParameter() s3_path = luigi.Parameter() filename = luigi.Parameter() def requires(self): pass def output(self, filename): luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename) def run(self): data = read_s3(s3_path + '/' + file) with self.output.open('w') as hdfs_file: write_hdfs(hdfs_file, data)class Transform(luigi.Task): date = luigi.DateParameter() s3_path = luigi.Parameter() filename = luigi.Parameter() def requires(self): return Extract(self.date, self.s3_path, self.filename) def output(self, filename): luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename) def run(self): with self.input().open('r') as inputfile: data = read_hdfs(inputfile) result = ml_algorithm(data) with self.output().open('w') as outputfile: write_hdfs(outputfile, result) mark_as_done(filename)class Load(luigi.Task): date = luigi.DateParameter() s3_path = luigi.Parameter() def requires(self): return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)] def output(self): # Fake DB target, just for illustrative purpose luigi.hdfs.DBTarget('...') def run(self): for input in self.input(): with input.open('r') as inputfile: result = read_hdfs(inputfile) # again, just for didatic purposes db = self.output().connection write_db(db, result)
然后我会将其添加到crontab中,并简单地包装到Docker容器中。
问题:
- 这是人们通常使用的正确模式吗?有没有更好的方法来做这件事?
- 如果我有
Transform1
(依赖于输入数据)和Transform2
(依赖于Transform1
的结果),并且希望将这两个结果保存到不同的数据库中,如何使用Luigi管道实现这一点(在监控文件的上下文中)? - 除了cron,人们还使用其他什么来做这件事?
- 如何正确地容器化这个过程?
回答:
你的模式看起来基本上是正确的。我建议使用cron作业来调用一个脚本,该脚本触发Load
任务管道。看起来这个Load
任务已经验证了S3存储桶中新文件的存在,但你需要更改输出以使其也具有条件性,这可以是一个状态文件或其他东西,如果没有要处理的内容。你也可以在一个更高层次的WrapperTask
(没有输出)中这样做,该任务仅在有新文件时才需要Load
任务。然后你可以使用这个WrapperTask
来要求两个不同的Load任务,这两个任务分别需要你的Transform1
和Transform2
。
添加容器…我的cron实际上调用的是一个脚本,该脚本从git中拉取最新的代码,必要时构建一个新的容器,然后调用docker run。我还有另一个始终运行的容器,运行luigid
。每日docker run在容器中执行一个使用CMD
的shell脚本,该脚本调用luigi任务并传递当天所需的参数。