オールアバウトTech Blog

株式会社オールアバウトのエンジニアブログです。

Cloud Dataflow PythonSDKによるビッグデータ処理実装応用

Cloud Dataflow

オールアバウトシステム部開発Gの@tajima_tasoです。

前々回、Cloud Dataflow PythonSDKによるビッグデータ処理実装入門の記事にてPythonSDKを使用したDataflowの基礎についてご紹介しましたが、今回はGCPの他のサービスとも連携した応用方法をご紹介します。

GCPの中でも、GCSについては既に基礎編でご紹介したので、応用編ではBigQuery、Datastore、そしてそれらのデータを処理するのに便利な機能についてご紹介します。

情報量が多くて途中混乱するかもしれませんが、最も大事なことはデータの入力、変換、結果の出力と保存にフォーカスすることです。手段に気を取られて目的を見失わないようにすることが何事も大事です。

SDKのアップデート方法

本題に入る前にSDKのアップデートを行いましょう。前回の記事執筆時の最新のSDKは0.6.0でしたが、この記事執筆時は2.0.0が最新版です。 リリースノートについては必ず英語版で確認します。

実際にアップデートを行う際は、事前準備として現Python環境と同じライブラリをインストールした別のPython環境で行うのが良いでしょう。同一のシステム内で複数のPython環境を作成するには、Virtualenvなどのユーティリティツールが利用できます。

下記コマンドで、google-cloudライブラリを一括でアップデートします。

$ pip install -U google-cloud

なお、下記のようにDataflowのSDKのみに絞ってアップデートすることもできますが、オススメしません。

$ pip install -U google-cloud-dataflow

SDKの内部でgoogle-cloudの他のライブラリを利用しているので、アップデートをかける時にそことうまく整合性が合わなくなる場合があるからです。

実際に筆者はgoogle-cloudの認証系のライブラリと引数名が異なってしまった為にエラーになるなどの現象を経験しています。

どうしても絞ってアップデートをする場合は、事前の検証を手厚くした方が良いでしょう。

アップデート後は、下記コマンドの実行結果のVersionの項目が2.0.0になっていることを確認します。

$ pip show google-cloud-dataflow

アップデートの確認ができたら、本題に入っていきます。

Cloud Dataflowの応用

BigQueryを利用する

BigQueryのデータの読み書きを行うサンプルは下記の通りです。

# -*- coding: utf-8 -*-

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None):

  parser = argparse.ArgumentParser()

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  p = beam.Pipeline(argv=pipeline_args)

  # BigQueryからデータ読み込み
  bq = p | u"BigQueryからデータ読み込み" >> beam.io.Read(source=beam.io.BigQuerySource(project=gcloud_options.project, query="SELECT * FROM [sample_project:tech_blog.tech_blog_table]"))

  # BigQueryに書き込むデータ生成とBigQueryへの書き込み
  ( p | u"BigQueryに書き込むデータ生成" >> beam.Create([{"name": "taro", "age": 15}])
    | u":BigQueryへのデータ書き込み" >> beam.io.Write(
      beam.io.BigQuerySink(
        "sample_project:tech_blog.tech_blog_table",
        schema="name:STRING, age:INTEGER",
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
      )
    )
  )

  # 実行
  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

ここで重要なことは、基礎編の時にご説明した通り、処理はソースコードの上から順番に行われるとは限らないということです。

つまり、BigQueryのテーブルへtaroと15が書き込まれた後に、読み込みが実行されるかもしれないし、読み込みが実行された後にtaroと15の書き込みが行われるかもしれないということです。

このように、順番が確実に保証されるような処理が必要な場合、いくつか方法はありますが、前処理と後処理のジョブに分離して、実行させるのが手軽です。

また、BigQueryのテーブルの仕様として、既存のテーブルへはデータの追加と一括削除のみしか行えないので、データ格納のアプローチとしては既存のテーブルに追加する、既存のテーブルのデータを全て消して挿入する、新規テーブルに挿入するの3つが考えられます。

上記の例ではwrite_dispositionにWRITE_TRUNCATEを指定しているので、既存のデータを全て消して挿入します。もし既存のテーブルに追加する場合は、WRITE_APPENDを指定します。なお、BigQueryを利用する場合、出来る限りテーブルを分離した方が管理面やコスト面でのメリットがあるので、基本的にはRDBMSとは異なった方針で設計すべきです。

挿入時の挙動についてどういったパラメータが指定できるかは、まだドキュメントが不十分なのでソースコードで確認するといいでしょう。

DataStoreを利用する

Datastoreのデータの読み書きを行うサンプルは下記の通りです。DatastoreはBigQueryより手順が少し複雑なので、内容をよく確認してみて下さい。

# -*- coding: utf-8 -*-

import argparse
import logging

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

class EntityWrapper(object):
  def __init__(self, namespace, kind, ancestor):
    self._namespace = namespace
    self._kind = kind
    self._ancestor = ancestor

class SampleEntityWrapper(EntityWrapper):

  def __init__(self, namespace, kind, ancestor):
    super(SampleEntityWrapper, self).__init__(namespace, kind, ancestor)

  def make_entity(self, data):
    from google.cloud.proto.datastore.v1 import entity_pb2
    from googledatastore import helper as datastore_helper
    entity = entity_pb2.Entity()
    if self._namespace is not None:
      entity.key.partition_id.namespace_id = self._namespace

    # All entities created will have the same ancestor
    datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, self._kind, data["id"])

    datastore_helper.add_properties(entity,
      {
        "name": data["name"],
        "age": data["age"]
      }
    )

    datastore_helper.add_properties(entity,
      {
        "other": data["other"]
      }
      , exclude_from_indexes=True
    )

    return entity

def make_ancestor_query(kind, namespace, ancestor):

  from google.cloud.proto.datastore.v1 import query_pb2
  from google.cloud.proto.datastore.v1 import entity_pb2
  from googledatastore import helper as datastore_helper, PropertyFilter

  ancestor_key = entity_pb2.Key()
  datastore_helper.add_key_path(ancestor_key, kind, ancestor)

  if namespace is not None:
    ancestor_key.partition_id.namespace_id = namespace

  query = query_pb2.Query()
  query.kind.add().name = kind

  datastore_helper.set_property_filter(
      query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor_key)

  return query

def format_datastore_value(data):

  props = [
    ("name", "string_value"),
    ("age", "integer_value"),
    ("other", "string_value")
  ]

  result = {}
  for p_pair in props:
    _name, _type = p_pair
    val = data.properties.get(_name, None)
    if val:
      result[_name] = getattr(val, _type)
    else:
      result[_name] = val

  return result

def run(argv=None):

  parser = argparse.ArgumentParser()

  parser.add_argument('--namespace',
                      dest='namespace',
                      required=True,
                      help='Datastore Namespace')
  parser.add_argument('--kind',
                      dest='kind',
                      required=True,
                      help='Datastore Kind')
  parser.add_argument('--ancestor',
                      dest='ancestor',
                      required=True,
                      help='Datastore Ancestor')

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  p = beam.Pipeline(argv=pipeline_args)

  # Datastoreからデータ読み込み
  query = make_ancestor_query(known_args.kind, known_args.namespace, known_args.ancestor)
  ds = p | "Datastoreからデータ読み込み" >> ReadFromDatastore(gcloud_options.project, query, known_args.namespace)
  # Datastoreから取得したデータをフォーマットする。
  ds = ds | "Datastoreから取得したデータをフォーマットする" >> beam.Map(format_datastore_value)

  # Datastoreに書き込むデータ生成とDatastoreへの書き込み
  ( p | u"Datastoreに書き込むデータ生成" >> beam.Create([{"id": 1,"name": u"taro", "age": 15, "other": u"特になし"}])
    | u"Datastoreに書き込む用のデータフォーマットに変換" >> beam.Map(SampleEntityWrapper(known_args.namespace, known_args.kind, known_args.ancestor).make_entity)
      | u"Datastoreへのデータ書き込み">> WriteToDatastore(gcloud_options.project)
  )

  # 実行
  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

まずは読み込みの方を見ていましょう。

Datastoreでは基本的に、祖先クエリというものを使ってデータの読み書きを行います。祖先クエリを使わないことも可能ですが、原則使うようにして下さい。祖先クエリを使わないデータは整合性が保証されません。

実際、挿入したデータに対して10分後に読み込んでみるとまだ取得できないというようなことが私の経験上でも頻繁に起きています。上記の例ではancestorというシンボルに関係する箇所が、祖先クエリの処理を行っている箇所です。

ただ、データの整合性が必ずしも重要ではないプロジェクトでは、祖先クエリを使う必要はありません。その場合、パフォーマンス上のメリットが得られます。

また、Datastoreからエンティティを取得した後は、BigQueryの時とは異なりもう一手間処理をはさむ必要があります。BigQueryでは、取得結果がそのまま扱いやすいデータ構造になっていたのに対して、Datastoreの取得結果はそのままのかたちでは扱いにくい形のデータ構造になっています。それを扱いやすい構造に変更しているのが、format_datastore_value関数で行っている処理です。

祖先クエリについて詳しくはこちらをご覧ください。

書き込みは読み込みよりはシンプルな手順になっていますが、こちらもBigQueryと違い、いくつかのメソッドを使って挿入するデータを作成する必要があります。その作成処理を担っているのが上記の例ではSampleEntityWrapperクラスのmake_entityメソッドです。

ここでのポイントは、作成するプロパティをインデックス付けして登録するかしないかの指定です。サンプルの例では、otherというキーのプロパティをインデックス付けの対象から外して登録しています。

インデックス対象に含めるとそのプロパティを検索や並び替えのキーとして利用できますが、格納できるプロパティの最大サイズが1500バイトまでとなります。検索に含めないプロパティであれば積極的にインデックスから外していくことが推奨されます。

Datastoreにおける制限事項についてはこちらをご覧下さい。執筆時の情報ではエンティティの最大サイズが1MiBとなっているので、そのサイズより大きなデータの読み書きを行う場合は要件の見直しが必要かもしれません。

副入力

これまで、1つのパイプラインで1つのデータに対して、何かしらの処理を行ってきました。しかし、場合によってはパイプラインの処理に対して外部から入力を与えたいケースもあります。それは副入力を利用することによって実現できます。

# -*- coding: utf-8 -*-

import argparse
import logging

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.pvalue import AsSingleton

def side_input_exe(main_data, age):
  if age < main_data["age"]:
    yield main_data

def run(argv=None):

  parser = argparse.ArgumentParser()

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  p = beam.Pipeline(argv=pipeline_args)

  # メインで処理したいデータを作成
  main = p | u"メインで処理したいデータを作成" >> beam.Create([{"name": "taro", "age": 10}, {"name": "hanako", "age": 15}, {"name": "masato", "age": 25}])

  # メインに渡したい副入力のデータ作成
  sub = p | u"メインに渡したい副入力のデータ作成" >> beam.Create([20])

  # 二十歳以上の人だけを抽出する
  over_20 = main | u"二十歳以上の人だけを抽出する" >> beam.FlatMap(side_input_exe, AsSingleton(sub))

  # 実行
  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

上記例ではメインのデータ処理ルーチンに対して、外部から20という数字を渡しています。これによって、20歳以上の人のデータのみを抽出する処理を実現しています。もしも、副入力を使わない場合は、side_input_exe関数内に20をベタ書きするしか方法はありません。

副入力はPCollectionのデータの性質によって、AsSingleton、AsList、AsDictなどを使って引き渡すことができます。今回の場合は単一値なのでAsSingletonを使っています。

このあたりに関しては残念ながらあまりドキュメントが整備されていないので、Javaのドキュメントを読むか、 直接ソースコードを読むのがわかりやすいです。

データのグルーピング

Dataflowでデータを処理する際に、ユーザーが自らコーディングしてそれらをグルーピングするのではなく、Dataflowの高度な最適化によってグルーピングを行うことができます。

# -*- coding: utf-8 -*-

import argparse
import logging

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.pvalue import AsSingleton

def run(argv=None):

  parser = argparse.ArgumentParser()

  known_args, pipeline_args = parser.parse_known_args(argv)
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True
  gcloud_options = pipeline_options.view_as(GoogleCloudOptions)

  p = beam.Pipeline(argv=pipeline_args)

  # メインで処理したいデータを作成
  main = p | u"グルーピングしたいデータを作成" >> beam.Create([(10, "taro"), (10, "hanako"), (25, "masato")]) | beam.GroupByKey()

  # 実行
  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

上記のようにPCollectionの各要素を2つの要素のタプルで構成される作りにします。生成されるPCollectionのデータは直感的には下記のようなデータ構造に変換されます。

# 変換前
(25, ['masato'])
(10, ['taro'])
(10, ['hanako'])

# 変換後
(25, ['masato'])
(10, ['taro', 'hanako'])

タプルの一つ目の要素が10で共通化された1つのデータになりました。こういった機能を使うことで、コードがスッキリするだけでなく、処理も効率的に行われます。もちろん、これらを更にパイプラインでの変換処理に繋げて処理することも可能です。

複数のパッケージを利用する

Dataflowでは、基本的にDataflowの実行時に指定したファイルしか実行することができません。通常の環境のように、ファイルシステム上のパスを指定して実行するようなことができないのです。そういった複数のPythonファイルにまたがってモジュールをインポートをする場合は、setup.pyを作成し、Dataflow実行時にそのファイルを指定する方法があります。

前回Dataflowの実行の為のシェルスクリプトの例を紹介しましたが、それを拡張します。

#!/bin/bash

# bashであれば下記ファイル内でそれぞれのパラメータ変数をexportコマンドで定義しておく。
source /app/tech.env

python -m /app/tech \
--project "${PROJECT}" \
--job_name "${JOBNAME}" \
--runner "${RUNNER}" \
--staging_location "${BUCKET}"/staging \
--temp_location "${BUCKET}"/temp \
--output "${BUCKET}"/output \
--setup_file /app/setup.py

setup.pyの内容は最低限、下記の内容をコピペで実行可能です。

from distutils.command.build import build as _build
import subprocess

import setuptools

setuptools.setup(
  name='PACKAGE-NAME',
  version='PACKAGE-VERSION',
  install_requires=[],
  packages=setuptools.find_packages(),
)

これによってサブディレクトリ内部のユーザー定義モジュールがDataflow上で利用可能になります。なお、setup.pyについてはDataflowの独自のものではなく、Pythonの機能の1つなので、詳しくはPythonの公式ドキュメントを参考にしてみて下さい。

まとめ

基本編と応用編の2回にわけて、Dataflow SDK for Pythonの機能のうち特に実用的な機能についていくつかご紹介しました。

コーディングの方法が、通常のプログラミング言語の感覚と異なるので慣れるまで少し時間を要するかもしれませんが、一番最初にご説明した通り、最も大事なことはデータの入力、変換、結果の出力と保存にフォーカスすることです。ここにフォーカスしてさえいれば、Dataflowを便利で扱いやすいプロダクトとして最大限利用できるはずです。

Dataflow SDK for PythonはDataflow SDK for Javaと比べるとまだまだ機能が少ないですが、今後更に機能が拡張され、ストリーミング、バッチ処理機械学習基盤として強化されていくことが期待できます。今のうちからPythonでDataflowを使ったデータ処理に取り組んでみてはいかがでしょうか?