オールアバウトTech Blog

株式会社オールアバウトのエンジニアブログです。

Google Cloud Functionsを使ってバックアップデータからログの再収集を行う方法の紹介

こんにちは!オールアバウトでデータエンジニアをしている@ondaljhです。

皆さんはGoogle Cloud Functions(以降、Cloud Functions)を利用したことがありますか?自分も業務上では使ったことがなかったんですが、今回ある案件でCloud functionsを利用する機会があったので、私たちのチームでのCloud Functionsの利用シーンを紹介させていただきたいと思います。この記事を最後まで読んでいただいて、「こんな利用シーンもあるんだ~」程度で理解していただければ幸いです。

従いまして、本記事は下記のような方を想定して書かせていただきます。

  • Google Cloud Functionsに興味はあるが深くかかわったことがない方
  • Google Cloud Functionsの利用パターンを調べてる方

また、Cloud Functions以外に出てくるGoogle Cloud群についての詳細な説明は記事の最後の方に公式サイトへのリンクを貼りますので、興味ある方はぜひ見てみてください!

概要

冒頭で書いた「ある案件」とは、チームプロジェクトで「ログ収集基盤のサーバーレス化」というものがありまして、そのうちの1つである「バックアップデータからログの再収集」を意味します。

まず、ログ収集基盤のサーバーレス化の基本的なアーキテクチャをお見せします。

f:id:allabout-techblog:20200826165138p:plain
ログ収集基盤のログ転送フロー(通常)

図の通りにウェブから発生したログはGoogle Cloud Run(以降、Cloud Run)で受け入れて最終的にはTreasure DataとGoogle BigQuery(以降、BigQuery)まで流れていきます。
また、Cloud RunからGoogle Cloud Dataflow(以降、Dataflow)までのデータ転送にはネットワーク上の通信の高速化のためにProtocol Buffersを使ってデータ通信を行います。(Protocol Buffersは通信の高速化以外にもいくつかのメリットがありますが、本題とずれるので詳細については記事の最後の方に貼ってある公式サイトへのリンクをご確認ください。)
なお、Cloud Runが受け入れたログはバックアップとしてBigQueryのあるテーブルへ格納します。その際のフォーマットは、Cloud RunからGoogle Cloud Pub/sub(以降、Pubsub)へ転送する、Protocol Buffersにてシリアライズされたフォーマットのままにしてます。
このBigQueryへ格納するバックアップデータが「ログの再収集」の肝になりまして、PubsubやDataflowに障害が発生した場合、このバックアップデータからリカバリーが可能になります。

ここからが本題で、Cloud Functionsの導入に伴う下記の2点について紹介させていただきたいと思います。
- リカバリー処理(BigQueryからデータを取得して~)
- Cloud Functionsのソース管理及びデプロイ

通常時と比べてリカバリー時のイメージは下記の図のようになります。

f:id:allabout-techblog:20200826165207p:plain
ログ収集基盤のログ転送フロー(リカバリー)

それでは、リカバリー処理から見てまいりましょう!

リカバリー処理

リカバリー処理自体はシンプルで、BigQueryへ格納されているバックアップデータを取得して、通常処理のPubsubへ投げる、だけになります。

  • BigQueryへバックアップされたデータは通常処理でPubsubが扱うデータと同じフォーマットとなっている
  • PubsubとDataflowはオートスケールできるため、通常処理にリカバリー処理をかぶせても負荷的な影響はない

実際の処理負荷に対しては気になるところだと思いますが、その詳細につきましてはここでは割愛させていただき、別の機会でご紹介できればと思います。

Cloud Functions設定情報

今回のCloud Functionsの設定情報は下記のようになります。

  • トリガー : HTTP
  • ランタイム : Go 1.13

また、内部的にはGCP関連いくつかの環境変数とあわせて、リカバリー用の環境変数も設定してソースの中で参照することにしています。

必要なIAMロール

リカバリーではサービスアカウントを使いますが、そのサービスアカウントに必要なIAMロールは下記になります。各ロールに詳しい方も多いと思いますが、補足として簡単な説明も記載させていただきます。

  • bigquery.jobs.create : プロジェクト内でジョブ(クエリを含む)を実行する権限。
  • pubsub.topics.publish : 1つ以上のメッセージをトピックへ追加する権限。
  • cloudfunctions.functions.setIamPolicy : 未承認のデベロッパーによる該当Functionsへの権限変更を防ぐために、該当Functionsをデプロイするユーザーまたはサービスに与える権限。

ファイル構成

  • functions.go : メイン処理
  • go.mod : go言語の依存モジュール管理ファイル

処理の流れ

基本的な処理の流れは下記のようになります。

1.トリガーでCloud Functionsの実行対象関数が呼び出される
2.BigQueryからデータを取得し、Iteratorを回してPubsubへメッセージを送る関数へデータを渡す
3.Pubsubへメッセージを送る

ソース

バックアップデータからログの再収集を行うソースを全部載せると記事が長くなりますので、流れが把握できる範囲で紹介させていただきます(エラー処理もすべて省きます)。

functions.go

package p

import (
    ~省略~
    "cloud.google.com/go/bigquery"
    "cloud.google.com/go/pubsub"
    "google.golang.org/api/iterator"
)

// ①Cloud Functionsでエンドポイントとして使われる関数
func EndPoint(w http.ResponseWriter, r *http.Request) {
    // BigQueryからデータを取得し、1行ずつPubsubへメッセージを送る
    fetchBigQueryData()
}

// ②BigQueryからデータを取得し、Iteratorを回してPubsubへメッセージを送る関数へデータを渡す
func fetchBigQueryData() {
    ctx := context.Background()

    client, err := bigquery.NewClient(ctx, {GCP_ID})
    defer client.Close()

    // バックアップデータ取得用クエリ設定
    q := client.Query({SQL})

    // 実行のためのqueryをサービスに送信してIteratorを通じて結果を返す
    // itはIterator
    it, err := q.Read(ctx)

    for {
        var tc {BQTableColumns 構造体}

        // 引数に与えたtcにnextを格納する
        // Iteratorを返す
        // これ以上結果が存在しない場合には、iterator.Doneを返す
        // iterator.Doneが返ってきたら、forを抜ける
        err := it.Next(&tc)
        if err == iterator.Done {
            break
        }

        // pubsubへメッセージを送る
        sendPubsubMessage(&tc)
    }
}

// ③pubsubへメッセージを送る
func sendPubsubMessage(tc *{BQTableColumns 構造体}) {
    msg := &pubsub.Message{
        Data: tc.Message,
    }

    ctx := context.Background()
    if _, err := {Pubsubのtopicインスタンス}.Publish(ctx, msg).Get(ctx); err != nil {
        return
    }
}

go.mod

module DOMAIN/MODULE_NAME

go 1.13

require (
    cloud.google.com/go/bigquery v1.10.0
    cloud.google.com/go/pubsub v1.6.1
    google.golang.org/api v0.30.0
)

Cloud Functionsのソース管理及びデプロイ

Cloud Functionsはサーバーリソースを各自が管理しなくても良いメリットがあります。
その代わりにというのも不適切かもしれませんが、CloudFunctions上で動かせるソースの管理をどうするか、また、デプロイはどうするかを検討しないといけません。
自分にとっては初のCloudFunctionsだったので、一旦検証時に利用したソース管理とデプロイ手順を紹介させていただきます。

Cloud Functionsへのソース反映方法

Cloud Functionsへのソース反映はCloud FunctionsのウェブUIからも確認できるように、下記の4種類があります。

  • Inline Editor
  • ZIP Upload
  • ZIP from Cloud Storage
  • Cloud Source repository

Inline Editorの場合はウェブUIから直接ファイルを更新するパターンなので、ソース管理ができない問題があります。
ZIPを利用する方法も、ソース管理ツールからファイルをダウンロードして、ZIP圧縮して、等の遠回りをすればある程度のソース管理はできますが、手間が増えるし、ミスも増える可能性があります。
Cloud Source repositoryはGCP上でのソース管理サービスです。このサービスにはなんと!他のソース管理ツールへのミラーリング機能があったので、今回はCloud Source repositoryを使ってみることにしました。
※現時点でミラーリングできるのは「GitHub」と「Bitbucket」の2種類のみですので他のソース管理ツールをご利用中の場合にはご注意ください。

ミラーリング設定

弊社ではBitbucketを使ってソース管理を行っているので、下記公式サイトの説明に従ってリポジトリミラーリング設定を行いました。
Bitbucket リポジトリのミラーリング

デプロイ

まずは、Bitbucketの該当リポジトリへ今回のソースをcommitしました。
そのあと、Cloud Source repositoryの該当リポジトリミラーリングが正常にできたことを確認したので、gcloudコマンドを使ってCloudFunctionsへデプロイを試してみました。

gcloudコマンドを使ってのデプロイは下記公式ドキュメントに詳細が載っているので参考しながらコマンドを書きました。

ソース コントロールからのデプロイ

gcloud functions deploy CLOUD_FUNCTION_NAME \
  --region REGION_ID \
  --source https://source.developers.google.com/projects/PROJECT_ID/repos/REPOSITORY_ID/moveable-aliases/master/paths/SOURCE \
  --runtime go113 \
  --service-account=SERVICE_ACCOUNT@appspot.gserviceaccount.com \
  --trigger-http \
  --set-env-vars HOGEHOHE=AAA,FUGAFUGA=BBB… \
  --entry_point ENTRY_POINT \

基本的には公式ドキュメントに従って書けば問題なくデプロイできることを確認しました。
また、「--runtime」オプションは該当Functionを最初に作るときに指定すれば良くて、そのあとからは該当オプションをしないことで同じFunctionへの更新になります。

おわりに

Cloud Runと比べてCloud Functionsはコンテナイメージのメンテナンスが不要というメリットがあります。
通常のログ収集処理はCloud Runで行うことにしましたが、めったに発生しないであろうログの再収集処理はCloud Functionsで行うことで、メンテナンスコストが節約できると思います。
また、今回は「HTTPトリガー」を試してみましたが、Cloud Functionsには「HTTPトリガー」以外にも「Cloud Pub/Subトリガー」や「Cloud Storageトリガー」もあるので、今後それらも試してみたいと思います。
デプロイについても、今回はgcloudコマンドでの手動デプロイを試してみましたが、近いうちにCircleCIからの完全自動デプロイも試してみたいと思ってます。

今後ともよろしくお願いいたします。

参考サイト

公式サイト

ブログ等

GoでBigQueryクライアントを実装してBigQueryからデータを取得する