クラウドインフラストラクチャとの統合

はじめに

主要なクラウドプロバイダーはすべて、オブジェクトストアに永続的なデータストレージを提供しています。これらは従来の「POSIX」ファイルシステムではありません。単一障害点なしに数百ペタバイトのデータを格納するために、オブジェクトストアは従来のファイルシステムディレクトリツリーを、よりシンプルな object-name => data モデルに置き換えます。リモートアクセスを可能にするために、オブジェクトに対する操作は通常、(遅い)HTTP REST 操作として提供されます。

Spark は、Hadoop で実装されたファイルシステムコネクタ、またはインフラストラクチャサプライヤー自身が提供するコネクタを通じて、オブジェクトストア内のデータの読み書きが可能です。これらのコネクタは、オブジェクトストアをファイルシステム、ディレクトリ、ファイル、およびそれらに対するリスト、削除、名前変更などの従来の操作にほぼ見せかけます。

重要:クラウドオブジェクトストアは実際のファイルシステムではありません

ストアはファイルシステムのように見えますが、内部的には依然としてオブジェクトストアであり、その違いは重要です

これらは、HDFS のようなクラスターファイルシステムの直接の代替として使用することはできません。明示的に記載されている場合を除きます

主な違いは次のとおりです。

これが Spark にどのように影響しますか?

  1. データの読み書きは、通常のファイルシステムでの作業よりも大幅に遅くなる可能性があります。
  2. 一部のディレクトリ構造は、クエリ分割計算中にスキャンするのが非常に非効率的である場合があります。
  3. RDD、DataFrame、または Dataset の保存時に Spark が通常行うコミットの、名前変更ベースのアルゴリズムは、遅くて信頼性の低い可能性があります。

これらの理由から、オブジェクトストアをクエリの直接の宛先、またはクエリチェーンの中間ストアとして使用することが常に安全とは限りません。安全と見なされる用途を判断するには、オブジェクトストアとそのコネクタのドキュメントを参照してください。

整合性

2021 年現在、Amazon (S3)、Google Cloud (GCS)、Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) のオブジェクトストアはすべて*一貫性があります*。

これは、ファイルが書き込み/更新されるとすぐに、他のプロセスからリスト、表示、および開くことができ、最新バージョンが取得されることを意味します。これは AWS S3 での既知の問題であり、特にオブジェクトが作成される前に実行された HEAD リクエストの 404 キャッシングに関係していました。

それでも:ストアコネクタのいずれも、ストリームが読み取っているオブジェクトが上書きされた場合に、クライアントがどのように処理するかについて保証を提供しません。古いファイルを安全に読み取れると想定しないでください。また、変更が可視になるまでの期間に制限があるとも、読み取っているファイルが上書きされた場合にクライアントが単に失敗しないとも想定しないでください。

このため、他のクライアントが積極的に読み取っていることがわかっている/可能性が高いファイルの上書きは避けてください。

その他のオブジェクトストアは*一貫性がありません*

これには、OpenStack Swift が含まれます。

そのようなストアは、作業の宛先として常に安全とは限りません。各ストアの特定のドキュメントを参照してください。

インストール

クラスパスに関連するライブラリがあり、Spark が有効な認証情報で構成されていれば、オブジェクトを URL としてデータパスに使用して読み書きできます。たとえば、sparkContext.textFile("s3a://landsat-pds/scene_list.gz") は、s3a コネクタを使用して S3 に格納されているファイル scene_list.gz の RDD を作成します。

関連ライブラリをアプリケーションのクラスパスに追加するには、hadoop-cloud モジュールとその依存関係を含めます。

Maven では、spark.version が Spark の選択したバージョンに設定されていると仮定して、pom.xml ファイルに以下を追加します。

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.13</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

Apache Spark をベースにした商用製品は、通常、クラウドインフラストラクチャと通信するためのクラスパスを直接設定するため、このモジュールは不要な場合があります。

認証

Spark ジョブは、オブジェクトストア内のデータにアクセスするために、オブジェクトストアと認証する必要があります。

  1. Spark がクラウドインフラストラクチャで実行されている場合、認証情報は通常自動的に設定されます。
  2. spark-submit は、Amazon S3 への s3n および s3a コネクタの関連認証オプションを設定するために、AWS_ENDPOINT_URLAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY、および AWS_SESSION_TOKEN 環境変数を読み取ることができます。
  3. Hadoop クラスターでは、設定は core-site.xml ファイルに設定できます。
  4. 認証詳細は、spark-defaults.conf の Spark 設定に手動で追加できます。
  5. または、アプリケーションの SparkContext を構成するために使用される SparkConf インスタンスでプログラム的に設定できます。

重要:認証シークレットをソースコードリポジトリ、特に公開リポジトリにチェックインしないでください。

関連する設定およびセキュリティオプションについては、Hadoop ドキュメントを参照してください。

設定

各クラウドコネクタには独自の構成パラメータがあります。これも、関連ドキュメントを参照してください。

一貫性モデルにより名前変更ベースのコミットが安全であるオブジェクトストアの場合、パフォーマンスのために FileOutputCommitter v2 アルゴリズムを使用し、安全性のためには v1 を使用します。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

これは、ジョブの終了時に「バージョン 1」アルゴリズムよりも名前変更が少なくなります。ファイルをコミットするために rename() を引き続き使用するため、オブジェクトストアに一貫性のあるメタデータ/リストがない場合は安全ではありません。

コミッターは、一時ファイルをクリーンアップする際に障害を無視するように設定することもできます。これにより、一時的なネットワーク問題がジョブ障害にエスカレートするリスクが軽減されます。

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

元の v1 コミットアルゴリズムは、正常なタスクの出力をジョブ試行ディレクトリに名前変更し、その後、ジョブコミットフェーズ中にそのディレクトリ内のすべてのファイルを最終的な宛先に名前変更します。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

Amazon S3 での模倣された名前変更の遅いパフォーマンスにより、このアルゴリズムは非常に、非常に遅くなります。これに対する推奨される解決策は、S3 「ゼロリネーム」コミッターに切り替えることです(以下を参照)。

参考までに、さまざまなストアとコネクタがディレクトリを名前変更する際のパフォーマンスと安全性の特性を以下に示します。

ストア コネクタ ディレクトリ名前変更の安全性 名前変更パフォーマンス
Amazon S3 s3a 安全ではない O(データ)
Azure Storage wasb 安全 O(ファイル)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(ファイル)
  1. 一時ファイルを保存すると料金が発生する可能性があるため、「_temporary」という名前のディレクトリを定期的に削除してください。
  2. AWS S3 の場合、マルチパートアップロードが未完了のままでいられる期間に制限を設定してください。これにより、未完了のアップロードによる請求が発生するのを防ぎます。
  3. Google Cloud では、ディレクトリの名前変更はファイルごとです。v2 コミッターの使用を検討し、ファイル名を含む、冪等な出力を生成するコードのみを記述してください。これは v1 コミッターよりも*安全ではありません*が、より高速です。

Parquet I/O 設定

Parquet データで作業する場合の最適なパフォーマンスのために、次の設定を使用してください。

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

これにより、クエリ中に読み取られるデータ量が最小限に抑えられます。

ORC I/O 設定

ORC データで作業する場合の最適なパフォーマンスのために、これらの設定を使用してください。

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

これも、クエリ中に読み取られるデータ量が最小限に抑えられます。

Spark Streaming とオブジェクトストレージ

Spark Streaming は、ストア内のパスを監視する FileInputDStream を作成することにより、オブジェクトストアに追加されたファイルを監視できます。これは StreamingContext.textFileStream() の呼び出しを通じて行われます。

  1. 新しいファイルをスキャンする時間は、*新しい*ファイルの数ではなく、パスの下にあるファイルの数に比例するため、遅い操作になる可能性があります。ウィンドウのサイズは、これを処理するように設定する必要があります。

  2. ファイルは、完全に書き込まれたときにのみオブジェクトストアに表示されます。ファイルがまだ書き込まれている間にファイルが取得されるのを防ぐために、書き込み後に名前変更するワークフローは必要ありません。アプリケーションは、監視対象のディレクトリに直接書き込むことができます。

  3. デフォルトのチェックポイントファイルマネージャーである FileContextBasedCheckpointFileManager の場合、ストリームは、高速でアトミックな rename() 操作を実装するストアにのみチェックポイントされる必要があります。そうでない場合、チェックポイントは遅くなり、潜在的に信頼性が低くなる可能性があります。AWS S3 では、Hadoop 3.3.1 以降で S3A コネクタを使用する場合、アボート可能なストリームベースのチェックポイントファイルマネージャーを使用できます(spark.sql.streaming.checkpointFileManagerClass 設定を org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager に設定することで)。これにより、遅い名前変更が解消されます。この場合、複数のクエリが並行して実行されている間にチェックポイントの場所が再利用されるのを避けるために、ユーザーはさらに注意する必要があります。そうしないと、チェックポイントデータが破損する可能性があります。

クラウドストレージへの作業の安全かつ高速なコミット。

前述のように、コミットによる名前変更は、最終的な一貫性を示すオブジェクトストア(例:S3)では危険であり、従来のファイルシステムの名前変更よりも遅いことがよくあります。

一部のオブジェクトストアコネクタは、名前変更を使用せずにタスクとジョブをコミットするためのカスタムコミッターを提供しています。

Hadoop S3A コミッター

Hadoop 3.1 以降でビルドされた Spark のバージョンでは、hadoop-aws JAR に S3A コネクタ経由でアクセスされる S3 ストレージに使用できる安全なコミッターが含まれています。

ストアのテンポラリディレクトリにデータを書き込んで名前変更する代わりに、これらのコミッターはファイルを最終的な宛先に書き込みますが、大規模な「マルチパート」アップロードを可視にするための最終的な POST コマンドは発行しません。これらの操作は、ジョブコミット自体まで延期されます。その結果、タスクとジョブのコミットははるかに高速になり、タスクの失敗は結果に影響しません。

S3A コミッターに切り替えるには、Hadoop 3.1 以降でビルドされた Spark のバージョンを使用し、次のオプションでコミッターを切り替えます。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

Spark がサポートする最も一般的な形式でテストされています。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

これらのコミッターの詳細については、最新の Hadoop ドキュメントで、S3A コミッターの詳細はS3A Committers による S3 への作業のコミットで確認できます。

注意:使用されるコミッターによっては、Hadoop 3.3.1 より前のバージョンでは、進行中の統計が過小報告される場合があります。

Amazon EMR:EMRFS S3最適化コミッター

Amazon EMR には、Parquet データ用の独自の S3 認識コミッターがあります。使用方法については、EMRFS S3最適化コミッターを参照してください。

実装とパフォーマンスの詳細については、[“EMRFS S3最適化コミッターによるApache SparkのApache Parquetフォーマットでの書き込みパフォーマンスの向上”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)

Azure および Google クラウドストレージ:MapReduce Intermediate Manifest Committer。

2022 年 9 月以降(3.3.5 以降)に出荷された hadoop-mapreduce-core JAR のバージョンには、Azure ADLS Generation 2 および Google Cloud Storage でのパフォーマンスと回復力のために最適化されたコミッターが含まれています。

このコミッター、「マニフェストコミッター」は、マニフェストファイルを使用して、タスクコミッターからジョブコミッターへのディレクトリリスト情報(GCS が失敗するアトミックなディレクトリ名前変更に依存しない)を伝播します。

ジョブコミッターはこれらのマニフェストを読み取り、タスク出力ディレクトリから宛先ディレクトリにファイルを直接、並列に、オプションのレート制限を適用して IO スロットリングを回避しながら名前変更します。これにより、オブジェクトストアでのパフォーマンスとスケーラビリティが向上します。

Azure ストレージでこれを使用することは、ジョブの正確性にとって重要ではありません。従来の FileOutputCommitter はそこで安全ですが、この新しいコミッターは、深いツリーと広いディレクトリを持つ大規模なジョブでよりスケーラブルです。

Google GCS はアトミックなディレクトリ名前変更をサポートしないため、マニフェストコミッターが利用可能な場合は使用する必要があります。

このコミッターは「動的パーティション上書き」をサポートしています(以下を参照)。

このコミッターの可用性と使用方法の詳細については、使用している Hadoop リリースに対応する Hadoop ドキュメントを参照してください。

Hadoop 3.3.4 以前では利用できません。

IBM Cloud Object Storage:Stocator

IBM は、IBM Cloud Object Storage および OpenStack Swift 用の Stocator 出力コミッターを提供しています。

ソース、ドキュメント、およびリリースは、Stocator - Storage Connector for Apache Spark で見つけることができます。

クラウドコミッターと INSERT OVERWRITE TABLE

Spark には「動的パーティション上書き」という機能があります。テーブルを更新でき、新しいデータが追加されたパーティションのみがその内容を置き換えます。

これは、INSERT OVERWRITE TABLE の形式の SQL ステートメント、および Datasets を「overwrite」モードで書き込むときに使用されます。

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

この機能はファイルの名前変更を使用し、コミッターとファイルシステムの両方に特定の要件があります。

  1. コミッターの作業ディレクトリは、宛先ファイルシステム内にある必要があります。
  2. ターゲットファイルシステムは、効率的にファイルの名前変更をサポートする必要があります。

これらの条件は、S3A コミッターと AWS S3 ストレージでは満たされません。

他のクラウドストアのコミッターは、この機能をサポートし、Spark に互換性があることを宣言する場合があります。Hadoop コミッターを介してデータを書き込む際に動的パーティション上書きが必要な場合、Spark は常に元の FileOutputCommitter が使用される場合にこれを許可します。他のコミッターの場合、インスタンス化後、Spark は互換性の宣言をプローブし、互換性があると宣言されている場合は操作を許可します。

コミッターが互換性がない場合、操作は PathOutputCommitter does not support dynamicPartitionOverwrite というエラーメッセージで失敗します。

ターゲットファイルシステムに互換性のあるコミッターがない限り、唯一の解決策は、クラウドフレンドリーな形式でデータを保存することです。

さらに読む

Apache およびクラウドプロバイダーの標準コネクタに関するドキュメントを以下に示します。

クラウドコミッターの問題と Hive 互換ソリューション