なんちゃってエンジニアぶろぐ

プログラムがガリガリ書けるわけじゃない。でもなんか新しいことに手を出したいそんな人のブログ。

embulk × digdag 動かしてみた2

続き。embulkをdigdagで動かしてみます。 なんか書いてる間にエラー多発して、更新遅れちゃいました・・・。

inui-beta.hatenablog.com

お題目はこんな感じ。

1. プラグインの導入

2. embulkのymlを作成

3. S3アップロード

4.digdagを利用してのファイルアップロード

5.ちょっとした応用

というわけで、前回実行したembulkをdigdagから起動してみます。

まずdigdagについて触れておきます。 ざっくりだし、別に深く考察しているわけでもないので、マジなエンジニアの方は流してください。←

簡単に言うとジョブを管理するフレームワーク的なもので、YAML形式のdigという設定ファイルに基づいて"ワークフロー"(="ジョブ")を管理します。 今回の設定ではジョブの情報をメモリ上で管理する設定にしていますが、本来はPostgreSQLとか入れて管理したり、実行状況をウォッチしたりすべきだと思います。

というわけで、本家へのリンク。 www.digdag.io

で、digファイルの構成要素は、本家のドキュメント見るのが良いかと。 僕みたいなのが使うのは、"sh>"オペレータくらいなものですが、きっともっといろいろできます。

ドキュメントリンク Workflow definition — Digdag 0.9.5 documentation

embulkのオペレータとかもあるのですが、今回最後に書く応用のところで"sh>"縛りが発生しているので、"sh>"だけ使っていきます。

まず、digdagのワークフローのテンプレートを作成します。

$ digdag init sample
$ cd ./sample
$ ls -a
  .digdag  .gitignore  sample.dig

カレントディレクトリに、sampleディレクトリが作成されて中にこのワークフローのdigファイルが作成されます。 このsample.digファイルを以下のように編集しました。

$ cat ./sample.dig
timezone: Asia/Tokyo

schedule:
  minutes_interval>: 5

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).format('YYYY-MM-DD HH:mm:ss Z')}

+step1:
  sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/cento                                                                                                                                                                s/logs/embulk_s3.log

_check:
  echo>: success.

_error:
  echo>: error!!

+teardown:
  echo>: finish ${session_time}

では、これを登録します。 コマンドは、作成されたsampleディレクトリ内で実行しております。

 $ digdag push sample
 2017-11-26 15:06:57 +0000: Digdag v0.9.20
Creating .digdag/tmp/archive-3700971538591255345.tar.gz...
  Archiving sample.dig
Workflows:
  sample.dig
Uploaded:
  id: 1
  name: sample
  revision: 04a783ad-332d-4c21-b923-eef391add5a1
  archive type: db
  project created at: 2017-11-26T15:07:00Z
  revision updated at: 2017-11-26T15:07:00Z

Use `digdag workflows` to show all workflows.

この辺は、もっとベターな方法があるかもしれませんが、いったん公式のサンプル通りに進めます。

これで準備完了です。 今回の設定ファイルは5分おき(schedule:の部分)に前回作成したs3アップロードのembulkを実行する(+step1:の部分)ようにしています。 また、成功の場合(check:の部分)は標準出力に"success."、エラーの場合(error:の部分)は"error!!"と出力するように指定しておりますので、以下に2パターンの結果を貼り付けます。

まず、errorの場合(logディレクトリ作り忘れてた・・・。)

2017-11-26 15:20:00 +0000 [INFO] (scheduler-0) io.digdag.core.workflow.WorkflowExecutor: Starting a new session project id=1 workflow name=sample session_time=2017-11-27T00:20:00+09:00
2017-11-26 15:20:00 +0000 [INFO] (0037@[0:sample]+sample+setup) io.digdag.core.agent.OperatorManager: echo>: start 2017-11-27T00:20:00+09:00
start 2017-11-27T00:20:00+09:00
2017-11-26 15:20:01 +0000 [INFO] (0037@[0:sample]+sample+disp_current_date) io.digdag.core.agent.OperatorManager: echo>: 2017-11-27 00:20:00 +09:00
2017-11-27 00:20:00 +09:00
2017-11-26 15:20:01 +0000 [INFO] (0039@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/centos/logs/embulk_s3.log
/bin/sh: line 1: /home/centos/logs/embulk_s3.log: No such file or directory
2017-11-26 15:20:01 +0000 [ERROR] (0039@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: Task failed with unexpected error: Command failed with code 1
java.lang.RuntimeException: Command failed with code 1
        at io.digdag.standards.operator.ShOperatorFactory$ShOperator.runTask(ShOperatorFactory.java:143)
        at io.digdag.util.BaseOperator.run(BaseOperator.java:35)
        at io.digdag.core.agent.OperatorManager.callExecutor(OperatorManager.java:312)
        at io.digdag.core.agent.OperatorManager.runWithWorkspace(OperatorManager.java:254)
        at io.digdag.core.agent.OperatorManager.lambda$runWithHeartbeat$2(OperatorManager.java:137)
        at io.digdag.core.agent.ExtractArchiveWorkspaceManager.withExtractedArchive(ExtractArchiveWorkspaceManager.java:36)
        at io.digdag.core.agent.OperatorManager.runWithHeartbeat(OperatorManager.java:135)
        at io.digdag.core.agent.OperatorManager.run(OperatorManager.java:119)
        at io.digdag.core.agent.MultiThreadAgent.lambda$null$0(MultiThreadAgent.java:127)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2017-11-26 15:20:02 +0000 [INFO] (0037@[0:sample]+sample^error) io.digdag.core.agent.OperatorManager: echo>: error!!
error!!
2017-11-26 15:20:02 +0000 [INFO] (0039@[0:sample]+sample^failure-alert) io.digdag.core.agent.OperatorManager: type: notify

で、うまくいった場合。

2017-12-02 13:05:00 +0000 [INFO] (scheduler-0) io.digdag.core.workflow.WorkflowExecutor: Starting a new session project id=1 workflow name=sample session_time=2017-12-02T22:05:00+09:00
2017-12-02 13:05:00 +0000 [INFO] (0030@[0:sample]+sample+setup) io.digdag.core.agent.OperatorManager: echo>: start 2017-12-02T22:05:00+09:00
start 2017-12-02T22:05:00+09:00
2017-12-02 13:05:01 +0000 [INFO] (0030@[0:sample]+sample+disp_current_date) io.digdag.core.agent.OperatorManager: echo>: 2017-12-02 22:05:00 +09:00
2017-12-02 22:05:00 +09:00
2017-12-02 13:05:01 +0000 [INFO] (0030@[0:sample]+sample+step1) io.digdag.core.agent.OperatorManager: sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml > /home/centos/logs/embulk_s3.log
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
2017-12-02 13:05:14 +0000 [INFO] (0030@[0:sample]+sample+teardown) io.digdag.core.agent.OperatorManager: echo>: finish 2017-12-02T22:05:00+09:00
finish 2017-12-02T22:05:00+09:00

JDKの警告はいったん無視。

S3へのアップロード結果がこんな感じ。

f:id:inui_beta:20171202223340p:plain

時刻が更新されているので、うまくいってる感じですね。 今回のは5分おきにスケジュールしているので、 しばらくしてから見るとこんな感じ。

f:id:inui_beta:20171202223754p:plain

更新日時が+5分。 うん。動いてる動いてる。

止めるときは以下のコマンド。 一時停止的なのもできるっぽいけど。

$ digdag delete sample
2017-12-02 14:16:14 +0000: Digdag v0.9.20
Project:
  id: 1
  name: sample
  latest revision: 08753ed6-36b2-46d6-a976-5b837810e6fd
  created at: 2017-12-02 12:58:05 +0000
  last updated at: 2017-12-02 12:58:05 +0000
Are you sure you want to delete this project? [y/N]: y
Project 'sample' is deleted.

以上、基本的な使い方。 ついでに、こんなコマンドでジョブのステータス確認もできるからいい感じ。

$ digdag schedules
2017-12-02 14:00:35 +0000: Digdag v0.9.20
Schedules:
  id: 1
  project: sample
  workflow: sample
  disabled at:
  next session time: 2017-12-02 23:05:00 +0900
  next scheduled to run at: 2017-12-02 14:05:00 +0000 (in 4m 23s)

1 entries.
Use `digdag workflows [project-name] [workflow-name]` to show workflow details.

次回実行時刻とかが見えてるのが安心感ありますね。 で、ここまでがdigdagの使い方です。

ここからはちょっとした応用。

digdagのdigファイルで設定した変数をymlに引き渡してみます。 何が便利かというと、・・・なんも考えなくても、変数をパラメータちっくに渡せるのが素晴らしい。 例えば、以下の例ではs3に日時ごとのパスを作ってアップロードしていますが、ファイル名に日時を入れたり、input定義側のqueryに日時入れたりもできるので、ジョブ管理が非常に楽ちん。 sysdateみたいなのを使うのもいいけど、再実行に困ったりしますからね。

ポイントは3つ。 digファイルで変数をセット。 ymlをyml.liquidにする。 yml.liquidからdigで定義した変数を参照する。

というわけで、以下その設定。

まずdigへ以下を追加。

_export:
  today: ${moment(session_time).format('YYYYMMDDHHmmss')}

で、ついでにymlファイルからyml.liquidへ実行ファイルを変更しておきます。

+step1:
  sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml.liquid > /home/centos/logs/embulk_s3.log

最終的なファイルはこんな感じ。 変数"today"に日時を入れています。

timezone: Asia/Tokyo

schedule:
  minutes_interval>: 5

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).format('YYYY-MM-DD HH:mm:ss Z')}

_export:
  today: ${moment(session_time).format('YYYYMMDDHHmmss')}

+step1:
  sh>: /home/centos/.embulk/bin/embulk run /home/centos/bin/s3.yml.liquid > /home/centos/logs/embulk_s3.log

_check:
  echo>: success.

_error:
  echo>: error!!

+teardown:
  echo>: finish ${session_time}

で、次にymlをyml.liquidにして、中身を編集します。

$  mv ../bin/s3.yml ../bin/s3.yml.liquid
$  vi ../bin/s3.yml.liquid

path_prefix: data/{{env.today}}/output
                       ↑ここを追加

で、これをdigdagで登録して動かすと・・・

f:id:inui_beta:20171203000835p:plain

この通り、勝手に日時のパスが作成されました。 こんな感じで環境変数で色んなものをやり取りできるのが便利ですね。 まだまだ活用のし甲斐がありそうです。

じゃあ、次回は、連携部分終わったので、BI部分に少し触れて、別のお題を検討していきます。