オールアバウトTech Blog

株式会社オールアバウトのエンジニアブログです。

Cloud Dataflow PythonSDKによるビッグデータ処理実装入門

Cloud Dataflow

オールアバウトシステム部開発Gの@tajima_tasoです。

前回、Google Cloud Platformをフル活用してNo-Opsでビッグデータ処理基盤を構築したの記事にて弊社がGCP上のサービスを利用して、No-Opsでのシステム基盤促進に取り組んでいるとご紹介させて頂きましたが、今回はその中のCloud Dataflowについてご紹介させて頂きます。

Cloud Dataflowの基礎

Cloud Dataflowを利用する為にはSDKを使用します。利用できるプログラミング言語としてはJavaPythonがありますが、私のチームではDataflow SDK for Pythonを利用して、DataFlowでの処理を行っています。

なお、以下の説明では、公式ドキュメントの内容が十分な箇所については極力省略させて頂いておりますので、必要に応じて公式ドキュメントもご参照下さい。

基本的な考え方

Dataflowでコーディングを行うにあたって直感的なイメージは、UNIXのパイプライン処理です。 UNIXライクなシステムのCLI(bash)で、下記のようなコマンドを入力すると、|の左側のコマンドの出力結果を右側のコマンドの入力として受け渡すことが可能です。

$ cat about.txt | grep about > result.txt

この場合はabout.txtの内容を読み取り、そこからaboutを含む行を抽出して結果をresult.txtに書き込むという処理を行います。 一連の処理を抽象化すると、データの入力、変換、結果の出力と保存という4段階のステップで成り立っていることがわかります。

DataFlowにおけるプログラミングは、この考え方をベースにもっておくとコーディングしやすいです。

環境の準備

環境の準備は下記の手順で進めていきます。

  1. GCPのコンソールからプロジェクトの作成と初期化
  2. ローカルマシンにCloud SDKのインストー
  3. Cloud Storageでバケット作成
  4. ローカルマシンからの認証を通す
  5. pipでDataflow SDK for Pythonをインストー

なお、今回使用するSDKのバージョンは、Google Cloud Dataflow SDK for Python 0.6.0です。Pythonのバージョンとしては2.7を使用していることを前提としているので、文法や規則に関してはPython2.7に準じます。

詳しくはPython を使用したクイックスタートを参考に行います。

なお、PythonSDKについてはJavaに比べるとまだまだ開発途上で、短いインターバルでのアップデートが予想されるので、サンプルコードがうまく動かない場合はリリースノートを参照してモジュールのパスやメソッドの呼び出し方を調整してみて下さい。

ちなみに、実は私もコアコードにコントリビュートしていたりします。

パイプライン

DataFlowでのパイプライン処理は以下の手順で実行されます

  1. Pipelineオブジェクトを作成。
  2. PipelineオブジェクトからPCollectionを作成。
  3. PCollectionに対してパイプラインで変換を適用する。
  4. パイプラインを実行する。

このあたりの概念は文章だとイメージしにくいので、簡単なサンプルコードで説明します。 コメントでどこがその変換に該当するか、記述してあります。

# -*- coding: utf-8 -*-

import argparse

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io import ReadFromText
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions

def add_hoge(elem):
  yield elem + "hoge"

def run(argv=None):

  parser = argparse.ArgumentParser()

  parser.add_argument("--output",
                      dest="output",
                      required=False,
                      help="Output file to write results to.")
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  # 1. Pipelineオブジェクトを作成
  p = beam.Pipeline(argv=pipeline_args)

  (p | u"PCollection作成" >> beam.Create(["All", "About", "Tech Blog"]) # 2. PipelineオブジェクトからPCollectionを作成
     | u"要素の1つ1つにhogeを追加" >> beam.FlatMap(add_hoge) # 3. PCollectionに対してパイプラインで変換を適用する
     | u"GCS上のストレージに書き込む" >> beam.io.WriteToText(known_args.output, shard_name_template=""))

  # 4. パイプラインを実行する
  result = p.run()
  result.wait_until_finish()

if __name__ == "__main__":
  run()

このPythonファイルを/app/tech.pyとして保存したとすると、実行は以下のようにシェルスクリプトにしておくと楽です。

#!/bin/bash

# bashであれば下記ファイル内でそれぞれのパラメータ変数をexportコマンドで定義しておく。
source /app/tech.env

python -m /app/tech \
--project "${PROJECT}" \
--job_name "${JOBNAME}" \
--runner "${RUNNER}" \
--staging_location "${BUCKET}"/staging \
--temp_location "${BUCKET}"/temp \
--output "${BUCKET}"/output \

このスクリプトを実行すると、Allhoge、TechBloghoge、Abouthogeという文字列が書き込まれたテキストファイルがGCS上に作成されます。

3. PCollectionに対してパイプラインで変換を適用するについて少し補足します。 FlatMapメソッドにジェネレータとしての関数を渡すと、PCollectionの個々の要素がその関数によって繰り返し処理されます。

今回の場合は、個々の要素にhogeという文字列の追加(変換)が適用された新しいPCollectionが作成されます。一連の処理の入力と出力は|でつながれ、UNIXライクなパイプライン処理のイメージでコード化されています。

ちなみに、DataFlowでは処理しなければならない計算量によって必要なリソースを見積もり、自動的にスケールして並列処理される仕様になっています。

またu"PCollection作成" >> のような>>の左側に書かれた文字列はパイプライン上でそのステップ上での変換名を指し、Dataflow Monitoring Interfaceで確認する時のラベルとなります。注意しなければならない点として、この変換名は1つのジョブの中で一意である必要があり、重複してしまうとプログラムがエラーで停止してしまいます。

Dataflow Monitoring Interface上では下記のように表示され、現在の状態を確認することができます。

f:id:allabout-techblog:20170530111139p:plain

ちなみに|と>>という演算子は、SDK内でPythonにおけるビットOR演算子とビット右シフト演算子のオペレーターオーバーロード(__or__、__rshift__、__ror__)を行うことによって実現しています。

並列処理

さきほどの例では、1つの処理のみを実行していましたが、DataFlowでは互いに依存しない処理については並列に実行してくれるのでその例を見ていきます。

# -*- coding: utf-8 -*-

import argparse

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.io import ReadFromText
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions

def add_hoge(elem):
  yield elem + "hoge"

def add_ten(elem):
  yield elem + 10

def run(argv=None):

  parser = argparse.ArgumentParser()

  parser.add_argument("--output",
                      dest="output",
                      required=False,
                      help="Output file to write results to.")
  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  # 1. Pipelineオブジェクトを作成
  p = beam.Pipeline(argv=pipeline_args)

  (p | u"PCollection作成" >> beam.Create(["All", "About", "Tech Blog"]) # 2. PipelineオブジェクトからPCollectionを作成
           | u"要素の1つ1つにhogeを追加" >> beam.FlatMap(add_hoge) # 3. PCollectionに対してパイプラインで変換を適用する
           | u"GCS上のストレージに書き込む" >> beam.io.WriteToText(known_args.output, shard_name_template=""))

  (p | u"二つ目のPCollection作成" >> beam.Create([1, 2, 3]) # 2. PipelineオブジェクトからPCollectionを作成
           | u"要素の1つ1つに10を追加" >> beam.FlatMap(add_ten) # 3. PCollectionに対してパイプラインで変換を適用する
           | u"GCS上のストレージに計算結果を書き込む" >> beam.io.WriteToText(known_args.output + "10", shard_name_template=""))

  # 4. パイプラインを実行する
  result = p.run()
  result.wait_until_finish()

if __name__ == "__main__":
  run()

hoge文字列を追加する処理に加えて、整数に10を加える変換を適用するパイプラインを追加しました。 この処理を実行すると、13、11、12が改行区切りで記述されたテキストが新たに作成されました。

実行後、Dataflow Monitoring Interfaceを確認すると下記のように表示されています。

f:id:allabout-techblog:20170530133054p:plain

2つのパイプラインの様子が横並びで示されています。これは2つの処理が並列に実行されたことを意味します。

どういうことなのかというと、ソースコードにべた書きしたプログラムの実行において、上にかいた処理が下に書いた処理より後に実行されるということは考えにくい(CPUのアウト・オブ・オーダー実行など非プログラマブルなケースを除く)と思いますが、DataFlowの場合は、そのケースもありうるということです。

この場合、どちらが先に実行されるかは状況によって変わってくるので実行の順番は保証されていません。

DataFlowにおいては、上記ソースコードの例でいうと、p.run()が実行されるまでは、あくまで実行すべきパイプライン処理の登録にとどまり、p.run()が呼ばれた段階で登録されたパイプライン処理が適切な環境とタイミングで実行されはじめます。

UNIXライクなパイプライン処理の例にあてはめると、下記のようにパイプでつないだ複数のコマンドをバックグラウンドで実行するイメージに近いです。

$ cat about.txt | grep about > result.txt &
$ cat about.txt | grep all > result_2.txt &

この特性を理解して、以下のポイントをなるべくおさえておくと、DataFlow上でパフォーマンスを発揮できる処理を実行できるはずです。

  1. 膨大な数のデータに対して、同じ処理を繰り返し行う。
  2. 各パイプライン同士の独立性が高い。

次回は応用編として、BigQuery、DataStoreを利用したDataFlowの処理や、SDKの機能を利用したコーディングテクニックにして紹介していきます。

シェルの記事書きました

Software Design 2017年7月号にて「しくみを知れば,bashは怖くない」と題して、bashについて書きました。bashプログラミング言語として、再構築して解説してみたので、よろしければご覧いただけますと幸いです。