データ パイプライン

データ パイプラインとは、一連の連続した処理ステップによってデータを処理するアプリケーションで、システム間のデータ転送、ETL と呼ばれる抽出 (Extract)、変換 (Transform)、読み込み (Load) 処理、データ拡充やリアルタイムのデータ分析処理等に適用されるコンセプトです。

データパイプラインにはバッチ処理とストリーミング処理があります。バッチ処理の例としては、一定の期間(1 日、1 ヶ月など)にわたって収集されたデータの集計処理などがあります。ストリーミング処理では、リアルタイムにアプリケーションから送信されるイベントデータの処理や、IoT デバイス等から大量に送信される時系列データの処理、他には OLTP データベースの更新データの反映などの例があげられます。

Google Cloud は、データ パイプラインを構築するマネージド サービスとして、Cloud DataflowCloud Data Fusion を提供しています。Cloud Dataflowは、OSS の Apache Beam SDK を利用して開発したパイプラインを実行するフル マネージドのデータ処理サービスです。Cloud Data Fusion は、OSS の CDAP のマネージド サービスで、組み込みのコネクタや GUI を用いてデータ パイプラインを構築することができます。  

エンタープライズ DWH として BigQuery を利用する際、データソースとなる基幹システム等のデータベースから未加工のデータを収集、蓄積することになります。本稿では、データソースからのデータ収集方法として、Dataflow を用いた ETL と、Cloud Data Fusion Replicator を利用した CDC によるデータ収集を紹介します。

 

バッチによる RDBMS から BigQuery へのデータ連携 (Cloud Dataflow template) 

解決する課題・使い所

BigQuery は、高度化するデータ分析環境への要求に応えられる、スケーラブルで柔軟性があり、コスト効率に優れたデータ プラットフォームです。セキュリティと耐久性、スケーラビリティのあるデータ ウェアハウスに対して、複数のデータソースからのデータを集めることで、分析環境の運用、管理にコストをかけずにデータを活用しビジネスの知見を得ることができます。

そういったデータソースの中には、OLTP 用途で利用されるリレーショナル データベースがあります。企業における業務トランザクション データやマスターデータ、e コマースサイトや各種アプリケーションのデータは、多くの場合 OLTP データベース によって管理されています。分析の第一歩として、これらのデータソースから BigQuery へ収集することが必要になります。

Cloud Dataflow は、Google Cloud のフルマネージドなデータ処理サービスです。Apache Beam SDK の実行環境として、ストリーミング処理とバッチ処理を統合して提供しています。Apache Beam SDK は、Google Cloud 以外にも様々なデータソースとデータシンク (Kafka、S3 等) に対応したコネクタが用意されており、バッチ、ストリーミング双方の処理に対応しています。また、JDBC ドライバ経由でリレーショナル データベースをインプットとしたデータ処理パイプラインを作成することが可能です。Beam SDK を利用し、Java や Python 等の言語で様々なデータ加工や処理を記述し、フルマネージドなパイプライン実行環境でスケーラブルに実行することができます。

Cloud Dataflow には、Dataflow テンプレートという、あらかじめテンプレート化されたデータ処理パイプラインを実行する機能があります。ユーザーは Cloud Console、 gcloud コマンドライン ツール、または REST API を使用して、Dataflow サービスでパイプラインを簡単に実行することができます。Dataflow テンプレートはユーザー自身が作成することも可能ですが、Google Cloud はオープンソースの Dataflow テンプレートを提供しており、Cloud Storage (GCS) から BigQuery への書き込みや Pub/Sub から GCS への書き込みなどのバッチ、ストリーミング双方に対応したテンプレートをいくつも用意しています。

このデザイン パターンでは、OLTP データベースから Java Database Connectivity (JDBC) to BigQuery テンプレートを用いて、バッチ処理にてデータを収集する方法を説明します。  

 

アーキテクチャ

Dataflow テンプレートを用いてバッチデータパイプラインを実行

Dataflow テンプレートを用いてバッチデータ パイプラインを実行すると、Dataflow サービスはパイプラインの処理ロジックを自動的に並列化し、ジョブを実行するために割り当てたワーカーに分散します。オンプレミス環境のデータベース サーバーへアクセスする場合には、Dataflow が実行されているサブネットからアクセス可能である必要性があります。

  • オンプレミスや他クラウド環境にあるデータベース サーバーへ接続するには、Cloud VPN や Cloud Interconnect が必要です。詳細は「関連するデザイン パターン」セクションに記載した「GCP + On-premises の DB(Cloud SQL の)同期」パターンも参照ください。

  • Dataflow テンプレート実行時にネットワークとサブ ネットワークを指定してパイプラインを実行することで、ワーカーノードが使用するIP レンジを指定することが可能です。 

また、JDBC to BigQuery テンプレートを使ったパイプラインを実行するには、以下の要件を満たす必要があります。

  • リレーショナル データベース用の JDBC ドライバが GCS バケットに格納されていること。

  • パイプラインを実行する前に BigQuery テーブルが存在していること。

  • BigQuery テーブルに互換性のあるスキーマであること。  

Cloud Console、 gcloud コマンドライン ツール、REST API から必要なパラメーターを指定して実行します。以下が Cloud Console からの実行方法です。

Dataflow のメニューから、「テンプレートからジョブを作成」を選択すると、テンプレート選択画面が表示されます。 

cloud-dataflow-template-2

Jdbc to BigQuery を選択すると、パラメーター入力画面が表示されます。

cloud-dataflow-template-3

JDBC 接続設定やソーステーブルに発行するクエリ、ターゲットの BigQuery の指定等を入力します。前述したネットワーク、サブネットワークの指定は「オプション パラメータを表示」を展開して設定します。 「ジョブを実行」をクリックすると、ジョブが作成され、実行が開始されます。 

利点

Dataflow テンプレートを利用することにより、パイプライン実行環境の構築、運用の手間もなく、また複雑なコードを記述せずにリレーショナル データベースから BigQuery へデータを取り込むパイプラインを作成することができます。 

関係するデザインパターン

参照文献


 

Cloud Data Fusion Replicator による リレーショナル データベース から BigQuery への変更データ キャプチャ(CDC)データ連携


解決する課題・使い所 

リレーショナル データベース に蓄えられたデータを用いて Google Cloud でデータを分析する場合、BigQuery にデータを取り込む方法として、「エクスポートしたデータを Cloud Storage にアップロードし、そこから BigQuery へロードする」「ETL ツールを利用して BigQuery へデータをロードする」など、複数の方法があります。

一方、より迅速な意思決定のために、リアルタイムに近い形でデータソースに蓄えられたデータを、データ ウェアハウス(DWH)に取り込みたいケースもあります。例えば「ソース データベースにあるマスターデータを、ほぼリアルタイムに BigQuery にロードして、レポートやダッシュボードを更新したい」というケースや、「最新のトランザクション データを用いて分析したい」といったケースです。このような要件に対しては、CDC を用いて、ソース データベースへの変更点を継続的に BigQuery へ反映させることで対応できます。また、ソース データベース側の処理に、負荷をかけずにデータを移動させることで、BigQuery 側で大量データ処理を実行させることも可能です。 

image

アーキテクチャ

Cloud Data Fusion は、バージョン 6.3.0 以上 で、MySQL(MySQL Binary Log を利用)、および SQL Server(SQL Server CDC を利用)とのレプリケーションに対応しています。Google Cloud 内のデータ ソースに加えて、ネットワーク要件を満たすことで、オンプレミス データセンターの RDBMS から、BigQuery に対して CDC を構成することが可能です。 

image

レプリケーションに必要なネットワーク接続と、セキュリティ メカニズムは、こちらのドキュメントをご参照ください。 

また、Cloud Data Fusion で、テナント プロジェクトのサービスやユーザー プロジェクト内の Dataproc などのサービスからデータ ソースに接続する方法については、こちらのドキュメントをご参照ください。 

 

Cloud Data Fusion レプリケーション機能の有効化

Cloud Data Fusion で CDC を利用するためには、Replication アクセラレータを有効にする必要があります。以下の図は、インスタンス作成時にアクセラレータを追加する例です。 

image

また、既存のインスタンスでレプリケーションを有効にするためには、こちらの手順を参考にします。


ソース データベース システムの設定

MySQL をソースとするレプリケーションを構築する際には、こちらのガイドに従い MySQL の設定を行います。 

SQL Server をソースとする場合には、Microsoft 社のガイドに従い Change Data Capture を有効にします。  


JDBC ドライバのアップロード

Cloud Data Fusion の web UI でレプリケーション ジョブを構成するために、取得元データベースに対応する JDBC ドライバを取得し、Cloud Data Fusion UI でアップロードします。


パイプラインの作成

Cloud Data Fusion UI で、メニューから Replication を選択します。Replication jobs 画面で、「Create a replication job」を選択し、レプリケーション パイプラインを作成します。Data Fusion のテナントプロジェクトからソース データベースに対して、ネットワーク経由でログインできるよう構成する必要があります。
ユーザー プロジェクトに構築した Compute Engine インスタンス上にインストールされている MySQL サーバーから、テナント プロジェクトである Cloud Data Fusion UI のパブリック IP を用いて接続し、BigQuery に複製するレプリケーション ジョブを構成するケースを例に説明します。なお、オンプレミス環境のデータ ソースに対してレプリケーションを作成する場合は、Cloud Data Fusion のネットワーク構成に関するドキュメントを参考にして Cloud Data Fusion インスタンスを構成します。

[Configure source] ページでは、データ ソースへの接続方法、データベース名などを設定します。

image


[Advanced] セクション
にある、Replicate Existing Data タブを切り替えることで、レプリケーション開始時点で、既存データの初期転送を実施するかどうかを選択します。

image


[Select tables] ページ
では、ソース データベースから、レプリケーション対象のテーブル、列を選択し、Insert、Update、Delete のイベントのうち、対象とするイベントを選択します。 

image


レプリケーション対象テーブルの [Columns to replicate] 列を選択することで、レプリケーション対象列を選択することができます。 

image


[Configure target] ページでは、レプリケーション対象となる BigQuery のプロジェクト名、データセット名、テーブル名、ロードインターバルなどを設定します。BigQuery へのレプリケーションでは、ステージング テーブルに、ソーステーブルから取得した変更イベントが蓄積され、設定したインターバルごとに、merge ステートメントによって変更が反映されます。 

image


[Configure advanced properties] ページでタスク数の設定を行います。1 時間あたりの処理データ量によってタスク数を設定することもできます。 

image


[Review assessment] ページでいずれかのテーブルの横にある [View mappings] をクリックすると、スキーマの問題や欠損している機能、接続性に関する複製中に発生する可能性のある問題の評価を取得できます。問題が発生した場合は、処理を進める前に問題を解決する必要があります。

image


[View Summary] ページDEPLOY REPLICATION JOB ボタンを押すと、レプリケーション パイプラインがデプロイされます。

image

 

パイプラインの開始

image

パイプライン ページで Start ボタンを押すと、パイプラインが開始されます。ダッシュボード上では,、Status が Provisioning、 Starting、 Running と遷移し、レプリケーション ジョブが実行されている様子を確認できます。この間、処理用の Dataproc クラスタがユーザー プロジェクトに作成され、レプリケーション ジョブが実行されます。Stop ボタンを押すとレプリケーション ジョブは終了し、Dataproc クラスタも削除されます。

パイプラインのモニタリング

image

パイプラインの実行ページでは、レプリケートされたデータ量やイベント数、スループット、レイテンシーや挿入、更新、削除された行数などのメトリクスを確認することができます。

利点

Cloud Data Fusion によるレプリケーションには、以下の利点があります。 

  • ソース データベースが、ネイティブに提供するレプリケーション メカニズムに対応したコネクタが用意され、信頼性の高いレプリケーション ジョブが構成できる。

  •  低遅延でスループットの高い(パブリック プレビューでは、1 時間あたり 50 GB のトランザクションに対応)レプリケーションが実行できる。
  • シンプルなウェブ インターフェースで、レプリケーション ジョブを設定できる。

  •  ダッシュボードで、レプリケーションのパフォーマンスに関するリアルタイムの分析情報が表示される。
  • データ所在地の指定、顧客管理の暗号鍵(CMEK)、VPC Service Controls をサポートし、データ セキュリティ要件に対応できる。

注意事項

レプリケーションを実行すると、BigQuery 内にステージング テーブルが作成され、変更差分が書き込まれます。その後、ステージング テーブルをターゲット テーブルにマージする処理が実行され、merge 文が発行されます。この際、BigQuery の処理料金が発生するため、公式ドキュメントでは BigQuery の定額料金を利用することを推奨しています。 

参考文献


 

アプリケーションおよびウェブデータを データ ウェアハウス に収集するパターン( Firebase 向け Google アナリティクス )

解決する課題・使い所

スマホアプリのやウェブサイト上でのユーザの行動を分析、モニタリングする場合、一般的には Firebase 向け Google アナリティクス や Google アナリティクスを使用しますが、一方で以下のよう複雑な要件を満たすためにはそれらの機能のみを使用して実現することが難しい場合があります。

  • 通常の UI ではサポートされていない複雑なレポーティングを行いたい
  • 他のデータをかけ合わせた分析をしたい(会員データと掛け合わせて、一部の有料会員だけに絞った分析をしたい)
  • ウェブ上の行動履歴とメルマガ開封状況の相関を分析したい

そのような場合、Google Analytics for Firebase やGoogle アナリティクスの生データを BigQuery にエクスポートして、

  • SQL をベースとしたデータ加工を行って必要なレポーティングを行う
  • 会員情報やマーケティング オートメーションのデータなど外部のデータソースとかけ合わせて分析を行う

ことにより、要件を満たすことができます。

アーキテクチャ

ga360toBQ
  • Google アナリティクスのデータは 1 日に 3 度以上の頻度でエクスポートされます(スキーマ)。また、最新のデータを数分以内に入手できるストリーミング エクスポートもオプションで利用可能です。

  • Firebase 向け Google アナリティクスのデータはリアルタイム ストリーミングとデイリーエクスポートでエクスポートされます(ストリーミング エクスポートを停止する選択肢はありません、スキーマ)。

利点

  • 各種のデータをノンコーディングで BigQuery にエクスポートできます。

注意事項

  • Google アナリティクスのエクスポートはデフォルトでUSリージョンに向けて行われるので、リージョンを指定するためには予め該当の Google Cloud Platform プロジェクトでアナリティクスのビュー ID と同じ名前のデータセットを作成する必要があります。(こちらのステップ 2.1 を参照)

  • Firebase 向け Google アナリティクスのエクスポートはリージョンが指定できないので、一旦エクスポート設定をして作成されたデータセットを削除し同名のデータセットをリージョンを指定したうえで再作成する必要があります。

このパターンで作成された事例

参照文献