試纏

ためしてまとめる〜色々なトピックやテーマについて試してみたり、まとめてみたりするブログです。

Workflow Engines Meetup #1 に参加してきた #wfemeetup

f:id:shinyaa31:20170309225655p:plain:w600

ビッグデータ基盤周りの業務に携わっている者にとって『ワークフローエンジン』は非常に重要な位置を占める"関心事"です。OSSから商用のものまでこの分野のツールやプロダクトは数多く存在し、多かれ少なかれ皆さん苦労しながらもそれぞれの利用ノウハウを蓄積している状況の様です。そんな中、そのものズバリの勉強会が企画されていたので速攻で申し込み、この日参加してきました。

イントロ

会の冒頭は@Mahito氏によるイベント定義・主旨説明について。『ワークフローエンジン』の定義等の共有が為されました。

f:id:shinyaa31:20170310023622p:plain:w600
f:id:shinyaa31:20170310023748p:plain:w600

セッション内容

ワークフローエンジン毎の発表内容は以下の通り。

Digdag:Digdagの特徴とQuick Start

この日古橋氏は米国カリフォルニア州、マウンテンビューからの中継発表。発表時の米国時間は夜中だった模様。遅い時間にありがとうございました。

f:id:shinyaa31:20170309235230p:plain:w250

以下発表メモ。

Jenkins:Jenkins 2.0 Pipeline & Blue Ocean

Jenkins2.0から対応する様になったプラグイン、PipelineとBlue Oceanに関する解説がメインとなったセッション。 こちらの内容についてはスライド資料が既に展開されているため、発表メモについては割愛します。

Luigi:Luigiを使っている話

リクルートマーケティングパートナーの@bwtakacy氏、スタディサプリ・データ分析基盤にてLuigiを使っているとの事。以下構成図。

f:id:shinyaa31:20170310000129p:plain:w600

  • 日次処理(の規模感)

    • Embulkで50以上のテーブルを連携
    • Luigiで30以上のHiveクエリを実行
    • TD Workflow(Digdag)で10以上のPrestoクエリを実行
    • TD上のスケジュールクエリも10個以上存在
    • 複数の部門に渡り、20以上のレポートを提供
  • Luigiの概要

    • 複数のバッチ処理を組み合わせたジョブを制御
    • 処理の依存関係の解決・スケジューリングに特化
    • 処理のアトミック性を確保
    • 全てPythonで記述
    • プラットフォームに依存しないため、様々な処理が一元的に記述可能
    • Spotifyが開発したものがOSS化された。
    • 名前の由来は『世界で2番目に有名な配管工』。
      f:id:shinyaa31:20170310001207p:plain:w180
  • Luigiの範囲外となる部分

    • リアルタイム処理・長期間継続実行の処理には不向き
    • 処理の分散実行は未サポート
    • 処理スケジュール起動やトリガ起動は出来ない:スタディサプリデータ基盤ではJenkinsからLuigiをスケジュール起動
    • スケーラビリティは追求していない:数千くらいなら行けるが数万規模は無理
  • Luigiの用語

    • Task:処理の実体、ワークフローの部品。事前に実行されているべきTaskを定義出来る
    • Target:Taskの正常終了を示す情報。ファイル on ローカル、HDFSRDB、S3など
    • Parameter:Taskの引数として与える事が出来る変数。例)日次処理における日付など
  • 簡単な例:

f:id:shinyaa31:20170310001849p:plain:w550

  • スタディサプリでの使用例:

f:id:shinyaa31:20170310001857p:plain:w550

  • インストールと実行

    • インストールpip install luigiで可能。
    • ワークフロー実行:luigi –module foo examples.Foo –local-scheduler
      • 起動するPythonスクリプトがsys.path配下に存在している事
      • local-scheduler:コマンド実行毎にスケジューラが起動/スケジューラプロセスを独立して実行しておけば不要
  • Luigiに対しての思う所等

    • CASE1.TASKの実行時間が知りたい
      • コンソールログはそれなりに出るが、各TASKの実行時間は出ない!
      • 解決策)PROCESSING_TIMEイベントを使えば出せる。
    • CASE2.並列実行とコマンド戻り値
      • デフォルトではTASKの失敗が起きてもluigiコマンドの戻り値は0。戻り値によるエラーハンドリングが出来ない。
      • 解決策)設定ファイルにてretcode設定を行い、luigiコマンド起動時に読み込ませる事で対応可能。
        異常発生の種類に応じた戻り値を設定出来る。
    • Luigi.cfg
      • luigiコマンド実行時に以下から読み込まれる。下に行く程優先度高
  • Luigiのハマりどころ

    • 1.RETCODEをバイパスしたワークフローが出来てしまう
    • 2.並列実行すると戻り値がおかしくなる:こちらはパッチ投稿で解決済
  • DigdagとLuigiの使い分けについて

    • シンプルなワークフローを記述するにはLuigiは重たい
    • 以下内容についてはTreasure Workflowに移行中
      • クエリの依存関係が簡単なもの
      • TASKがTDクエリのみのもの
    • Embulk/FTP等他システム連携系の処理がある場合はLuigiを使用
    • TASK間の依存関係が複雑になりそうなものはLuigi
    • GUIはDigdag/Treasure Workflowの方が優れている。LuigiのGUIは正直あまり宜しくない…

Azkaban:Azkaban in my use case

f:id:shinyaa31:20170310013546p:plain:w300

LinkedIn社でHadoopの依存関係を解決する為に実装されたツール。モダンではない『おっさんに優しい』Javaで書かれているプロダクトなのだそうです。こちらのセッションについても資料が既に公開されているのでメモは割愛します。

Airflow:Apache Airflow(incubating)の紹介

  • 発表者:Kengo Seki(@sekikn39)

  • Apache Airflowに興味を持った背景

    • 幾つかの案件でワークフローエンジンとしてApache Oozieを使用していた
    • Oozieは実績も多く安定しており、Hadoopとの親和性も高い
    • 一方で制約も多い。その為、Oozieの代替となるプロダクトを探し始めた
    • 制約や好きでないところの一覧。このページには@sekikn39氏の恨み辛みが詰まっているとの事w
      f:id:shinyaa31:20170310012319p:plain:w600
    • これらの要素に対して、Apache Airflowでは以下の様な対応状況だった。これによりApache Airflowに興味を抱く形に。
      f:id:shinyaa31:20170310012327p:plain:w600
  • Apache Airflowの概要・機能紹介

画面要素を見せながらの用語解説。

f:id:shinyaa31:20170310014916p:plain:w600 f:id:shinyaa31:20170310014925p:plain:w600 f:id:shinyaa31:20170310014919p:plain:w300

  • 既存のOperatorの例

    • 参考:Concepts — Airflow Documentation
    • コマンド発行系:
      • BashOperator, DockerOperator, SimpleHttpOperator, PythonOperator
    • SQL発行系:
      • HiveOperator, JdbcOperator, MsSqlOperator, MySqlOperator, OracleOperator
      • PigOperator, PostgresOperator, SqliteOperator
    • データ転送系:
      • HiveToDruidTransfer, HiveToMySqlTransfer, MsSqlToHiveTransfer
      • MySqlToHiveTransfer, PrestoToMySqlTransfer, RedshiftToS3Transfer
      • S3FileTransformOperator, S3ToHiveTransfer
    • 通知系:
      • EmailOperator, SimpleHttpOperator, SlackAPIPostOperator
    • 自作のOperator定義も可能。
  • その他の概念や機能

    • Connection:各種データストアへの接続情報を管理
    • Hook:Connectionを使ってデータストアにアクセスしたり、データをload/dumpするためのメソッドを提供
    • Pools:タスクの並列数を管理
    • QueueCeleryのような、外部のキューイングシステムをジョブキューとして利用可能
    • Branching:DAG中での条件分岐を実現
    • SLA:一定時間内に成功しなかったtaskを管理者にメール通知
  • デモ実演:架空のペット店の売上データの整形・集計・分析を題材としたデモを実践。

  • Apache Airflowへの要望

    • Airflowに不足している機能
      • HA構成のサポート
      • 運用性の向上、特にDAGの登録・更新・削除をWebUIから実施出来るようにしたい
      • UTC以外のタイムゾーンのサポート
      • WebUIからワークフローの任意の場所で処理を停止・再開する機能(国産ジョブスケジューラは大体持っている)
  • Airflow Meetup Tokyo?

    • Airflow PMC chairに会ったところ、東京でもmeetupを開催してみては?との提案をもらう
      • 米国西海岸では2〜3ヶ月に1回のペースで開催しているらしい
      • テレカン・英語で良ければスピーカーの紹介も

クロージング

最後は次回開催についてのざっくりな構想など。場所や日時は未定となっているものの、扱うテーマについてはApache Oozie/StackStorm/Trigrav/Rukawa等が既に発表候補として挙がっているようです。

まとめ

1テーマ約20分、合計5つの『ワークフローエンジン』に関する発表となりましたがいずれも機能の概要を把握する上では非常に充実した内容となっていたと思います。個人的に幾つか興味があるプロダクトの話も聞けたので色々と参考になる部分も得る事が出来ました。第2回も非常に楽しみですね!