embulk × digdag 動かしてみた2
続き。embulkをdigdagで動かしてみます。
なんか書いてる間にエラー多発して、更新遅れちゃいました・・・。
お題目はこんな感じ。
1. プラグインの導入
2. embulkのymlを作成
3. S3アップロード
4.digdagを利用してのファイルアップロード
5.ちょっとした応用
というわけで、前回実行したembulkをdigdagから起動してみます。
まずdigdagについて触れておきます。 ざっくりだし、別に深く考察しているわけでもないので、マジなエンジニアの方は流してください。←
簡単に言うとジョブを管理するフレームワーク的なもので、YAML形式のdigという設定ファイルに基づいて"ワークフロー"(="ジョブ")を管理します。 今回の設定ではジョブの情報をメモリ上で管理する設定にしていますが、本来はPostgreSQLとか入れて管理したり、実行状況をウォッチしたりすべきだと思います。
というわけで、本家へのリンク。 www.digdag.io
で、digファイルの構成要素は、本家のドキュメント見るのが良いかと。 僕みたいなのが使うのは、"sh>"オペレータくらいなものですが、きっともっといろいろできます。
ドキュメントリンク Workflow definition — Digdag 0.9.5 documentation
embulkのオペレータとかもあるのですが、今回最後に書く応用のところで"sh>"縛りが発生しているので、"sh>"だけ使っていきます。
まず、digdagのワークフローのテンプレートを作成します。
$ digdag init sample
$ cd ./sample
$ ls -a
.digdag .gitignore sample.dig
カレントディレクトリに、sampleディレクトリが作成されて中にこのワークフローのdigファイルが作成されます。 このsample.digファイルを以下のように編集しました。
$ cat ./sample.dig
timezone: Asia/Tokyo
schedule:
minutes_interval>: 5
+setup:
echo>: start ${session_time}
+disp_current_date:
echo>: ${moment(session_time).format('YYYY-MM-DD HH:mm:ss Z')}
+step1:
sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/cento s/logs/embulk_s3.log
_check:
echo>: success.
_error:
echo>: error!!
+teardown:
echo>: finish ${session_time}
では、これを登録します。 コマンドは、作成されたsampleディレクトリ内で実行しております。
$ digdag push sample
2017-11-26 15:06:57 +0000: Digdag v0.9.20
Creating .digdag/tmp/archive-3700971538591255345.tar.gz...
Archiving sample.dig
Workflows:
sample.dig
Uploaded:
id: 1
name: sample
revision: 04a783ad-332d-4c21-b923-eef391add5a1
archive type: db
project created at: 2017-11-26T15:07:00Z
revision updated at: 2017-11-26T15:07:00Z
Use `digdag workflows` to show all workflows.
この辺は、もっとベターな方法があるかもしれませんが、いったん公式のサンプル通りに進めます。
これで準備完了です。 今回の設定ファイルは5分おき(schedule:の部分)に前回作成したs3アップロードのembulkを実行する(+step1:の部分)ようにしています。 また、成功の場合(check:の部分)は標準出力に"success."、エラーの場合(error:の部分)は"error!!"と出力するように指定しておりますので、以下に2パターンの結果を貼り付けます。
まず、errorの場合(logディレクトリ作り忘れてた・・・。)
2017-11-26 15:20:00 +0000 [INFO] (scheduler-0) io.digdag.core.workflow.WorkflowExecutor: Starting a new session project id=1 workflow name=sample session_time=2017-11-27T00:20:00+09:00
2017-11-26 15:20:00 +0000 [INFO] (0037@[0:sample]+sample+setup) io.digdag.core.agent.OperatorManager: echo>: start 2017-11-27T00:20:00+09:00
start 2017-11-27T00:20:00+09:00
2017-11-26 15:20:01 +0000 [INFO] (0037@[0:sample]+sample+disp_current_date) io.digdag.core.agent.OperatorManager: echo>: 2017-11-27 00:20:00 +09:00
2017-11-27 00:20:00 +09:00
2017-11-26 15:20:01 +0000 [INFO] (0039@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/centos/logs/embulk_s3.log
/bin/sh: line 1: /home/centos/logs/embulk_s3.log: No such file or directory
2017-11-26 15:20:01 +0000 [ERROR] (0039@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:312)
at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:254)
at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
at io.digdag.core.agent.ExtractArchiveWorkspaceManager.withExtractedArchive(ExtractArchiveWorkspaceManager.java:36)
at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2017-11-26 15:20:02 +0000 [INFO] (0037@[0:sample]+sample^error) io.digdag.core.agent.OperatorManager: echo>: error!!
error!!
2017-11-26 15:20:02 +0000 [INFO] (0039@[0:sample]+sample^failure-alert) io.digdag.core.agent.OperatorManager: type: notify
で、うまくいった場合。
2017-12-02 13:05:00 +0000 [INFO] (scheduler-0) io.digdag.core.workflow.WorkflowExecutor: Starting a new session project id=1 workflow name=sample session_time=2017-12-02T22:05:00+09:00
2017-12-02 13:05:00 +0000 [INFO] (0030@[0:sample]+sample+setup) io.digdag.core.agent.OperatorManager: echo>: start 2017-12-02T22:05:00+09:00
start 2017-12-02T22:05:00+09:00
2017-12-02 13:05:01 +0000 [INFO] (0030@[0:sample]+sample+disp_current_date) io.digdag.core.agent.OperatorManager: echo>: 2017-12-02 22:05:00 +09:00
2017-12-02 22:05:00 +09:00
2017-12-02 13:05:01 +0000 [INFO] (0030@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/centos/logs/embulk_s3.log
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
2017-12-02 13:05:14 +0000 [INFO] (0030@[0:sample]+sample+teardown) io.digdag.core.agent.OperatorManager: echo>: finish 2017-12-02T22:05:00+09:00
finish 2017-12-02T22:05:00+09:00
JDKの警告はいったん無視。
S3へのアップロード結果がこんな感じ。
時刻が更新されているので、うまくいってる感じですね。 今回のは5分おきにスケジュールしているので、 しばらくしてから見るとこんな感じ。
更新日時が+5分。 うん。動いてる動いてる。
止めるときは以下のコマンド。 一時停止的なのもできるっぽいけど。
$ digdag delete sample
2017-12-02 14:16:14 +0000: Digdag v0.9.20
Project:
id: 1
name: sample
latest revision: 08753ed6-36b2-46d6-a976-5b837810e6fd
created at: 2017-12-02 12:58:05 +0000
last updated at: 2017-12-02 12:58:05 +0000
Are you sure you want to delete this project? [y/N]: y
Project 'sample' is deleted.
以上、基本的な使い方。 ついでに、こんなコマンドでジョブのステータス確認もできるからいい感じ。
$ digdag schedules
2017-12-02 14:00:35 +0000: Digdag v0.9.20
Schedules:
id: 1
project: sample
workflow: sample
disabled at:
next session time: 2017-12-02 23:05:00 +0900
next scheduled to run at: 2017-12-02 14:05:00 +0000 (in 4m 23s)
1 entries.
Use `digdag workflows [project-name] [workflow-name]` to show workflow details.
次回実行時刻とかが見えてるのが安心感ありますね。 で、ここまでがdigdagの使い方です。
ここからはちょっとした応用。
digdagのdigファイルで設定した変数をymlに引き渡してみます。 何が便利かというと、・・・なんも考えなくても、変数をパラメータちっくに渡せるのが素晴らしい。 例えば、以下の例ではs3に日時ごとのパスを作ってアップロードしていますが、ファイル名に日時を入れたり、input定義側のqueryに日時入れたりもできるので、ジョブ管理が非常に楽ちん。 sysdateみたいなのを使うのもいいけど、再実行に困ったりしますからね。
ポイントは3つ。 digファイルで変数をセット。 ymlをyml.liquidにする。 yml.liquidからdigで定義した変数を参照する。
というわけで、以下その設定。
まずdigへ以下を追加。
_export:
today: ${moment(session_time).format('YYYYMMDDHHmmss')}
で、ついでにymlファイルからyml.liquidへ実行ファイルを変更しておきます。
+step1:
sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml.liquid > /home/centos/logs/embulk_s3.log
最終的なファイルはこんな感じ。 変数"today"に日時を入れています。
timezone: Asia/Tokyo
schedule:
minutes_interval>: 5
+setup:
echo>: start ${session_time}
+disp_current_date:
echo>: ${moment(session_time).format('YYYY-MM-DD HH:mm:ss Z')}
_export:
today: ${moment(session_time).format('YYYYMMDDHHmmss')}
+step1:
sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml.liquid > /home/centos/logs/embulk_s3.log
_check:
echo>: success.
_error:
echo>: error!!
+teardown:
echo>: finish ${session_time}
で、次にymlをyml.liquidにして、中身を編集します。
$ mv ../bin/s3.yml ../bin/s3.yml.liquid
$ vi ../bin/s3.yml.liquid
path_prefix: data/{{env.today}}/output
↑ここを追加
で、これをdigdagで登録して動かすと・・・
この通り、勝手に日時のパスが作成されました。 こんな感じで環境変数で色んなものをやり取りできるのが便利ですね。 まだまだ活用のし甲斐がありそうです。
じゃあ、次回は、連携部分終わったので、BI部分に少し触れて、別のお題を検討していきます。