はじめのProfessional Cloud Data Engineer認定取得講座㉙Data Fusion、BigQuery Storage Write API、Dataformのアサテーションについて説明します!

クラウド

今回も、Professional Cloud Data Engineer認定取得するために、私が勉強した内容をアウトプットしていきます。

今回は、Data Fusion、BigQuery Storage Write API、Dataformのアサテーションについて説明します!

ぜひ、最後までご覧いただけると嬉しいです!

Data Fusion

Data Fusion は、Google Cloud が提供する、フルマネージドでクラウドネイティブなデータ統合サービスです。簡単に言えば、ETL (抽出、変換、読み込み) パイプラインを、コーディングなしで迅速に構築・管理できるプラットフォームです。

このサービスはオープンソースの CDAP を基盤としており、インフラの管理を Google Cloud に任せつつ、スケーラブルなデータ統合ソリューションを実現します。

Data Fusion の主な特徴とメリット

  • グラフィカル インターフェース (GUI):最大の特徴は、ドラッグ&ドロップ対応のビジュアル インターフェースです。開発者、データアナリスト、ビジネスユーザーなど、SQL やコーディングの専門家でなくても、複雑なデータ パイプラインを視覚的に設計・デプロイできます。
  • 豊富なコネクタとプラグイン:オンプレミスのデータベース (例:SQL Server, PostgreSQL)、他のクラウド (例:Salesforce, Redshift)、Google Cloud サービス (例:BigQuery, Cloud Storage) など、多種多様なデータソースや宛先システムに接続するためのプラグインが事前に用意されています。
  • フルマネージド & サーバーレス:パイプラインの実行に必要な基盤インフラ (Dataproc クラスタなど) は、Data Fusion が自動的にプロビジョニング・管理・削除します。ユーザーはリソースのサイジングやインフラ管理について心配する必要がありません。

主要なコンセプトとコンポーネント

Cloud Data Fusion を理解する上で重要な、いくつかの主要コンポーネントがあります。

  1. Cloud Data Fusion Studio (データプレーン):ここがパイプラインを「設計」する場所です。グラフィカル インターフェースを使い、ソースからのデータの取り込み、Wrangler (ラングラー) を使ったデータのクレンジングや変換、BigQuery などへのロード、といった一連の流れ(パイプライン)を構築します。
  2. Cloud Data Fusion コンソール (コントロール プレーン):これは Data Fusion サービス自体を「管理」する場所です。新しい Data Fusion インスタンスの作成、削除、更新、ネットワーク設定など、インフラレベルの操作を行います。
  3. 実行環境 (Dataproc):Studio で設計されたパイプラインは、実行時に Google Cloud の Dataproc (Spark) クラスタ上で実行されます。Cloud Data Fusion は、パイプライン実行開始時に一時的な (エフェメラルな) Dataproc クラスタを自動で作成し、実行が完了すると自動で削除します。これにより、処理能力のスケーラビリティとコスト効率を両立しています。
  4. Wrangler (ラングラー):Studio 内で利用できる強力なデータ準備ツールです。データのプレビューを見ながら、列の分割、名前変更、フィルタリング、型の変換といったデータクレンジング作業を対話的に行うことができます。
  5. コンピューティング プロファイル:パイプラインを「どこで」「どのように」実行するかを定義する設定です。例えば、「自動スケーリングする Dataproc クラスタで実行する」といったデフォルト設定や、「既存の特定の Dataproc クラスタで実行する」といったカスタム設定が可能です。

Data Fusionのまとめ

Data Fusion は、コード不要で ETL パイプラインを迅速に構築・運用できる、フルマネージドのデータ統合サービスです。
ビジュアルGUIと豊富なコネクタにより、複雑なデータ統合処理を直感的に設計でき、多様なデータソースに柔軟に対応します。インフラは完全に自動管理され、Dataproc を用いたスケーラブルな実行基盤で効率的に処理が行われます。Studio とコンソールにより、パイプライン設計とインスタンス管理を明確に分離して操作できる点も特徴です。データ準備を支援する Wrangler や実行環境を制御するコンピューティングプロファイルにより、使いやすさと拡張性を両立したデータ統合基盤を提供します。

BigQuery Storage Write API

BigQuery へデータをリアルタイムで取り込む際、従来の API とは一線を画す強力な選択肢が「BigQuery Write API」です。これは、Storage Write API を利用して、大規模なデータを高スループットかつ低コストで BigQuery にストリーミングするために設計された次世代の API です。

従来の tabledata.insertAll メソッド(レガシーな Streaming API とも呼ばれます)に代わる、より高性能な方法として推奨されています。

Write API の主なメリット

なぜ従来の API ではなく Write API を選ぶべきなのでしょうか。主な利点は以下の通りです。

  1. 圧倒的な高スループット:従来の insertAll API と比較して、非常に高いスループット(秒間数ギガバイト以上)を実現できます。これは、API が gRPC をベースにしたストリーミング方式を採用しているためです。
  2. コスト効率の向上insertAll が取り込み行数に基づいて課金されるのに対し、Write API は取り込まれたデータ量(バイト数)に基づいています。大量の小さなレコードを頻繁に取り込む場合、Write API の方がコスト効率が大幅に良くなる可能性があります。
  3. 「1 回限り (Exactly-Once)」のセマンティクスの提供:Write API は「ストリーム オフセット」を指定することで、書き込みの重複排除をサポートします。これにより、ネットワーク障害などでリトライが発生しても、データが二重に書き込まれることを防ぐ「1回限り」の挿入セマンティクスを(アプリケーション側で)実現できます。これは insertAll の「ベストエフォート型」の重複排除よりも強力です。

Write API の仕組みとモード

Write API は、データをどのようにストリーミングするかについて、主に2つのモードを提供しています。

1. デフォルト ストリーム (Committed モード)

  • 概要:最もシンプルなモードです。レコードを送信すると、即座に BigQuery に書き込まれ、すぐにクエリ対象となります。
  • 特徴
    • 低レイテンシ(ほぼリアルタイム)での取り込み。
    • 1回限りのセマンティクスをサポート。
    • insertAll API の直接的な置き換えとして利用できます。

2. アプリケーション作成ストリーム (Pending モード)

  • 概要:より高度な制御が必要な場合に使用します。データをまず「保留中 (Pending)」の状態のストリームに書き込み、すべての書き込みが完了したと判断した時点で、そのストリーム全体を「コミット」します。
  • 特徴
    • ETL 処理の最後に、トランザクションのように一括でデータを反映させたい場合に適しています。
    • データをコミットするまでクエリには表示されません。

どのような場合に使うべきか?

Write API は、特に以下のようなユースケースでその真価を発揮します。

  • リアルタイム分析:IoT デバイスのセンサーデータ、Web サイトのクリックストリーム、アプリケーション ログなど、継続的に発生する大量のイベントデータを遅延なく BigQuery に取り込みたい場合。
  • Kafka からのデータ連携:Apache Kafka や Google Cloud Pub/Sub からのメッセージを、高い信頼性(1回限り)とスループットで BigQuery にストリーミングする場合。
  • insertAll API の置き換え:すでに insertAll を使用しているものの、スループットの制限やコスト、重複排除の信頼性に課題を感じている場合の移行先として最適です。

BigQuery Storage Write APIのまとめ

BigQuery Storage Write API は、従来の insertAll に代わる高性能なストリーミング取り込み手段として設計された次世代 API です。gRPC ベースにより圧倒的な高スループットを実現し、課金もデータ量ベースのためコスト効率にも優れています。ストリームオフセットを活用することで、Exactly-Once の重複排除が可能となり信頼性が向上します。利用モードとして、即時反映の「Committed」と一括コミットの「Pending」を提供し、用途に応じた柔軟な取り込みが可能です。リアルタイム分析や Kafka 連携、insertAll からの移行に特に適した選択肢です。

Dataformのアサーション

Dataform におけるアサーション (Assertion) とは、データ変換ワークフローの各ステップでデータの品質を自動的にテストするためのクエリ(仕組み)です。

データパイプラインを運用していると、「NULL になるはずのない列に NULL が入っている」「ID が重複している」といったデータの不整合が発生しがちです。アサーションは、このような「期待どおりでないデータ」を検出する番人のような役割を果たします。

アサーションの仕組み

アサーションの本質は、「違反する行を検出するクエリ」です。

  • アサーションが成功:クエリを実行した結果、0 件の行が返ってきた場合(=違反がなかった場合)、テストは成功です。
  • アサーションが失敗:クエリを実行した結果、1 件以上の行が返ってきた場合(=違反データが見つかった場合)、テストは失敗となります。

Dataform はワークフローの実行時にこれらのアサーションを自動で実行し、もし失敗した場合はアラートを送信するように設定できます。これにより、下流のダッシュボードや分析に不正確なデータが流れるのを防ぐことができます。

アサーションの作成方法

アサーションには、大きく分けて「組み込みアサーション」と「手動アサーション」の 2 種類があります。

1. 組み込みアサーション (推奨)

最も簡単で一般的な方法です。テーブルを定義する SQLX ファイルの先頭にある config ブロック内に、テストしたい条件を記述します。

<主な組み込みアサーション>

  • nonNull (NULL 値のチェック):指定した列が NULL でないことを保証します。 例:user_id と email は絶対に NULL であってはならない。
  • uniqueKey (一意キーのチェック):指定した列(単一)の値が重複していないことを保証します。 例:user_id は必ず一意である。
  • uniqueKeys (複合キーのチェック):指定した複数の列の組み合わせが重複していないことを保証します。 例:signup_date と customer_id の組み合わせは重複しない。
  • rowConditions (カスタム行条件):より複雑なカスタムロジックを SQL 式で定義します。 例:email 列は必ず「@」と「.」を含む、または signup_date は必ず「2019-01-01」以降である。

<config ブロックへの記述例 (example.sqlx)

config { 
  type: "table",
  assertions: {
    uniqueKey: ["user_id"],
    nonNull: ["user_id", "customer_id"],
    rowConditions: [
      'email like "%@%.%"',
      'signup_date is null or signup_date > "2019-01-01"'
    ]
  }
}

SELECT ... -- ここに通常の SELECT クエリが続く

2. 手動アサーション (カスタム SQL)

より高度なテストや、Dataform で作成していない既存のデータセットをテストしたい場合に使用します。

これは、type: "assertion" を指定した専用の SQLX ファイルを作成する方法です。このファイルには、違反する行だけを返す SELECT クエリを記述します。

<手動アサーションの例 (custom_assertion.sqlx)>

config { type: "assertion" }

-- sometable から条件に違反する行(a, b, c のいずれかが NULL)を検索する
-- このクエリが 1 行でも返すと、アサーションは「失敗」となる
SELECT
  *
FROM
  ${ref("sometable")}
WHERE
  a IS NULL OR b IS NULL OR c IS NULL

アサーションの結果確認

Dataform は、実行されたアサーションクエリの結果を、指定されたスキーマ(デフォルトでは dataform_assertions)にビューとして自動作成します。

もしアサーションが失敗した場合、Dataform の実行ログでアラートを確認できるだけでなく、BigQuery 上でこのビュー(例:dataform_assertions.assertion_name)を直接クエリすることで、「どのデータが」「なぜ」違反しているのかを迅速に特定できます。

Dataformのアサテーションのまとめ

Dataform のアサーションは、データパイプラインにおけるデータ品質の劣化を自動的に検出する仕組みで、分析基盤の信頼性を高める重要な役割を担います。NULL 値や重複などの不整合データを検出し、違反があれば即座にアラートを上げることで、誤ったデータが下流へ流れることを防止します。組み込みアサーションにより、nonNull・uniqueKey などの一般的な品質チェックを簡潔に定義でき、より複雑なチェックには手動アサーションで柔軟に対応できます。アサーションの結果は BigQuery にビューとして保存され、違反内容を素早く分析できる点も実運用での大きな利点です。
これにより Dataform は、データ変換だけでなく品質保証まで一貫して行える堅牢な ELT 基盤を提供します。

まとめ

今回は、下記3点について説明しました。

  1. Data Fusion
  2. BigQuery Storage Write API
  3. Dataformのアサテーション

本ブログでは、データ統合・変換・取り込みの各工程を支える Google Cloud の代表的なデータ基盤サービスを紹介しました。Cloud Data Fusion により、ノーコードで複雑な ETL パイプラインを直感的に構築でき、データ統合の生産性を大きく向上できます。BigQuery Storage Write API は、高スループットかつ低コストでのリアルタイムデータ取り込みを可能にし、信頼性の高いストリーミング基盤を実現します。さらに Dataform のアサーション機能を利用することで、データ品質を自動監視し、分析基盤全体の信頼性を確保できます。これらのサービスを組み合わせることで、効率的で堅牢なモダンデータパイプラインを構築できる点が大きな特徴です。

これからも、Macのシステムエンジニアとして、日々、習得した知識や経験を発信していきますので、是非、ブックマーク登録してくれると嬉しいです!

それでは、次回のブログで!

タイトルとURLをコピーしました