Pythonistaのためのdigdag py> operator開発ガイド
Table of Contents
この記事は、Arm Treasure Data Advent Calendar 2019の24日目です。
今年の夏に新しくTreasure Dataで使えるようになったPython Custom Scriptingですが、 開発する際にどういう点を気をつければ良いのかという質問をいただくことが多いので、 今日はTreasure Workflowとdigdagのpy> operatorを使った開発の際に気をつけることを書いて行こうと思います。
なお、Treasure WorkflowとありますがOSSのdigdagでも活かせる内容があると思います。
なお、既にPython Custom Scriptingが試せる環境にある方は、このGoogle Colab notebook https://bit.ly/191212_cs で一通り試すことが出来ます。
workflowをpushするまえにローカル環境で開発とテストをする方法
基本戦略:Pythonのタスクを合理的な粒度にまとめる
Treasure Workflowやdigdagのサーバーモードでは、タスク間の中間ファイルを置くためのストレージがないため、一つ一つのタスクの粒度が大きくなりがちです。 特に、Treasure Workflowでは一つ一つのtaskでコンテナが起動するため、そのオーバーヘッドを無視できません。 一方で、一つの巨大なタスクは、開発時のデバッグを難しくさせます。 そこで、開発時の戦略としては一つ一つは最低限の塊に分けた関数を用意します。そして、必要に応じてそれらを束ねる関数を用意するのが良いでしょう。
Treasure Workflow向けのworkflowをローカル環境で開発するにはいくつかの選択肢があります。
- Treasure Dataの公式Dockerイメージを使う
- Dockerイメージと同じPythonの仮想環境を使う
1. Treasure Dataの公式Dockerイメージを使う
基本的にはTreasure Dataが提供している公式イメージを利用すると、本番環境と同じイメージが使うことができるので良いでしょう。
その際には、他のPythonスクリプトの開発と同様に、切り分けた単体で実行できるようにして開発するのが良いでしょう。
if __name__ == "__main__":
your_function("default_argument")
2019/12/23現在、最新のDocker imageは以下の2つです。
- digdag/digdag-python:3.7 https://hub.docker.com/r/digdag/digdag-python
- digdag/digdag-anaconda3:2019.03 https://hub.docker.com/r/digdag/digdag-anaconda3
PyCharmのProfessional Editionを持っている場合、Dockerのコンテナにremote debuggerがアタッチできるので便利です。 詳しくはPyCharmのドキュメントをご参照ください。
2. Dockerイメージと同様のPythonの仮想環境を作る
Python は仮想環境を作るための venv
と呼ばれるモジュールがあります。
これを使うことで他のプロジェクトと分離したパッケージの環境を作ることができます。
digdag-python:3.7
digdag-python:3.7と同様の環境を作るには、次のgistからrequirements.txtとconstraints.txtをダウンロードします。 https://gist.github.com/chezou/d0a0fc62007af4d808752e78b31ae694
その後、以下のようにコマンドを実行することでDockerイメージと同様のPython環境が構築できます。
$ python -m venv .venv
$ source .venv/bin/activate
(.venv)$ pip install -r requirements.txt -c constraints.txt
apt-get
を使いますが、それをスクリプト内に書いても実行ができません。digdag-anaconda3:2019.03
anacondaイメージと同じ環境を作りたい場合は、gistからenvironment.ymlをダウンロードします。 https://gist.github.com/chezou/d0a0fc62007af4d808752e78b31ae694#file-environment-yml
その後、以下のコマンドを実行するとデフォルトの環境に依存するパッケージをインストールします。
conda env update -n base -f environment.yml
なお、新しい環境を構築したい場合は、environment.ymlの中の base
を my-env
など好きな名前に変更した後、以下のコマンドで新規仮想環境を作成してください。
conda env create -f environment.yml
py> operatorを含んだworkflowをローカルで実行する
もし、ローカル環境でworkflow全体を実行したい場合は、 digdagの v0_10ブランチを使うことで、
本番環境に近いdigdag環境が用意できます。
py> operatorにパラメータを渡す
py> operatorにパラメータを渡すには2つの方法があります。
- digdagの引数を使う
- 環境変数を使う
- digdagの変数でやりとりする
1. digdagの引数を使う
以下のような py_scripts/examples.py
というスクリプトがあるとします。
def print_arg(msg):
print(f"Message is {msg}")
msg
という変数を print_arg
という関数に渡す場合、以下のようなdigdagのtaskになります。
+simple_with_arg:
py>: py_scripts.examples.print_arg
msg: "Hello World"
docker:
image: "digdag/digdag-python:3.7"
もし、複数の引数をPythonの関数へ渡したい場合、digdagの引数を増やせばよいでしょう。
ここで注意してほしいのが、digdagの変数はPythonにシームレスに渡されるということです。
つまり、 **kwargs
などで引数を受け取る場合、意図しないdigdagの変数がPythonに渡る可能性に注意してください。
例えば、上の例では docker
という変数には {"image": "digdag/digdag-python:3.7"}
という辞書型の変数が代入されています。
ですので、 **kwargs
で受けるのではなく、明示的な引数の指定を強くおすすめします。
た、同様に意図しないdigdagの変数と py> operatorの引数の衝突が起こる場合があります。 以下のようなワークフローがあったとします。
_export:
td:
database: my_db
+simple_with_arg2:
py>: py_scripts.examples.print_arg_td
msg: "Hello World"
docker:
image: "digdag/digdag-python:3.7"
このとき、Pythonのスクリプトは以下のような関数を持っているとします。
def print_arg_td(msg: str, td: str = None):
print(f"'msg' is {msg} and 'td' is {td}")
simple_with_arg2
タスク実行時に print_arg_td
関数へ渡される引数 td
は何になるでしょうか?
通常のPythonの感覚では、デフォルト値である None
が期待されます。
しかし、 digdagのワークフローで td
という変数がすでにexportされているため、 引数 td
には {"database": "my_db"}
という辞書が格納されてしまいます。
これにより、変数 td
の型(dict
)が期待されていたもの(str
)との不一致が起こりうるのです。
このようなミスマッチを避けるために、digdagで予約されている変数やよく利用される変数を避けたほうが良いでしょう。
特に td
という変数や以下の変数は避けたほうが良いです。
- td.endpoint
- td.apikey
- td.use_ssl
- td.proxy.enabled
- td.proxy.host
- td.proxy.port
- td.proxy.password
- td.proxy.user
これらの既に利用されている変数は将来的に変更される可能性があります。
digdagの組み込み変数は次のドキュメントをご参照ください。 http://docs.digdag.io/workflow_definition.html#using-variables
また、digdagは int
から str
など意図しない型の変換を行う場合があります。そのため、型のチェックや明示的な変換をおすすめします。
2. 環境変数を使う
環境変数は、py> operatorに変数を渡すためのもう一つの方法です。
TDのAPI keyやAWSのsecret keyなどセキュアな情報はこの形式でやりとりすることが多いです。
例えば、以下のような simple_with_env
というタスクがあったとします。
+simple_with_env:
py>: py_scripts.examples.print_env
_env:
MY_ENV_VAR: "hello"
docker:
image: "digdag/digdag-python:3.7"
このとき、 MY_ENV_VAR
は以下のようにPythonから取得できます。
import os
def print_env():
print(f'Env var is {os.environ["MY_ENV_VAR"]}')
Treasure Workflowではセキュアな情報を扱うためにdigdagの secrets
を格納するデータベースを提供しています。
- Projectを
td workflow push
でプッシュする - secretsを
td workflow secrets
でデータベースに格納する - secretsを
{secret:my_secret}
という形式でworkflowに書き、環境変数経由でPythonにわたす
環境変数での受け渡しをしないとsecretsはPythonに渡されません。
例えば、以下のようなワークフローとPythonスクリプトでは正しくsecretsに格納された td.apikey
という情報が渡されます。
+simple_with_env2:
py>: py_scripts.examples.access_td
_env:
TD_API_KEY: ${secret:td.apikey}
docker:
image: "digdag/digdag-python:3.7"
import os
def access_td():
# Able to fetch API key like "1234/XXXX"
apikey = os.environ["TD_API_KEY"]
# Do awesome execution
しかし、以下のようにdigdagの引数として渡した場合はsecretsが評価されません。
+simple_with_env_ng:
py>: py_scripts.examples.access_td_ng
apikey: ${secret:td.apikey}
docker:
image: "digdag/digdag-python:3.7"
def access_td_ng(apikey):
# Always shows "${secret:td.apikey}" instead of actual API key like "1234/XXXX"
print(apikey)
secretsに関する詳細は、digdagのドキュメントをご参照ください。
3. digdagの変数を使う
digdagのpy> operatorとして実行するPythonスクリプトでは、 import digdag
とすることでdigdagモジュールを利用することが出来ます。
以下のように、 digdag.env.params
と呼ばれる変数に辞書のようにアクセスすることでdigdagの変数を直接読むことが出来ます。
def read_workflow_env():
import digdag
print(digdag.env.params["my_msg"])
このとき、実行するworkflowは例えば以下のようなものになります。
_export:
my_msg: "awesome message"
+simple_with_arg2:
py>: py_scripts.examples.read_workflow_env
docker:
image: "digdag/digdag-python:3.7"
なお、読み込みだけではなくdigdag.env.store
を使うことでdigdagの変数の書き込みもできます。
ただし、あまり大きな変数の受け渡しはしないほうが良いでしょう。
def store_workflow_env(msg):
import digdag
digdag.env.store({"my_msg": msg})
このdigdagモジュールは、digdagがpy> operator実行時に動的に生成しているため、ローカルでは実行できないことに注意してください。
ローカルの実行を考慮する場合は、 try-expect
を利用するのが良いでしょう。
try:
import digdag
digdag.env.store({"feature_query": feature_query})
except ImportError:
pass
PythonやOSのパッケージのインストール方法
Treasure WorkflowではDocker imageに入っていないパッケージは os.system
や subprocess.run
を使い実行します。
import os, sys
os.system(f"{sys.executable} -m pip install --upgrade pytd==1.4.3")
import subprocess
# arguments should be passed by list
subprocess.run([sys.executable, "-m", "pip", "install", "--upgrade", "pytd==1.4.3"])
この際、パッケージのバージョンを指定しましょう。
OSのパッケージのインストールの場合も同様です。
import os
os.system("apt-get update") # Need to run before doing apt-get install
os.system("apt-get install -y wkhtmltopdf")
py> operatorを含むディレクトリ構成
一つのプロジェクトでは、以下のようなディレクトリ構成をおすすめしています。
my_project
├── README.md
├── config
│ ├── params.test.yml <- Configuration file for run through test. Mirror params.yml except for `td.database`
│ └── params.yml <- Configuration file for production
├── awesome_workflow.dig <- Main workflow to be executed
├── ingest.dig <- Data ingestion workflow
├── py_scripts <- Python scripts directory
│ ├── __init__.py
│ ├── data.py <- Script to upload data to Arm Treasure Data
│ └── my_script.py <- Main script to execute e.g. Data enrichment, ML training
├── queries <- SQL directory
│ └── example.sql
├── run_test.sh <- Test shell script for local run through test
└── test.dig <- Test workflow for local run through test
この構成に従ったdigdagプロジェクトをcookiecutter-digdagを使うことで簡単にテンプレートから生成できます。 https://github.com/chezou/cookiecutter-digdag
実行時のエラーを通知する
Pythonのスクリプト実行時に発生したエラーをSlackなどで通知したい事があると思います。
digdagは _error:
でworkflowが失敗した際の処理をできますが、そのとき${error.message}
の中にPythonの例外情報が入っています。
以下のようなworkflowとPython scriptがあったとします。
+simple_raise_error:
py>: py_scripts.examples.error_sample
docker:
image: "digdag/digdag-python:3.7"
_error:
echo>: ${error.message}
def error_sample():
int("a1234") # raises ValueError
このとき、以下のようなログが得られます。
2019-12-24 23:06:32 +0900 [INFO] (0039@[0:python]+simple^error): echo>: Python command failed with code 1: invalid literal for int() with base 10: 'a1234' (ValueError)
from Traceback (most recent call last):
from File ".digdag/tmp/digdag-py-2-1815457087076518360/runner.py", line 165, in <module>
result = callable_type(**args)
from File "/private/var/folders/y9/bnjb3krn39s22rmg_wvlnf7m0000gp/T/digdag-tempdir2111531196420040503/workspace/1_simple_1_2_2945225080250994454/py_scripts/examples.py", line 5, in print_arg
int("a1234")
from ValueError: invalid literal for int() with base 10: 'a1234' (runtime)
この例では、echo>
operatorでエラーを出力しているだけですが、Slack等に例外を送ることで定期実行しているPythonタスクの通知が簡単に行なえます。
まとめ
このように、様々なポイントをおさえることでTreasure Workflowの開発をしやすくなるかと思います。 また、digdagでもいくらかのポイントは使うことができるかと思います。