クラウドインフラストラクチャとの統合

はじめに

主要なクラウドプロバイダーはすべて、オブジェクトストレージに永続的なデータストレージを提供しています。これらは従来の「POSIX」ファイルシステムではありません。単一障害点を発生させることなくペタバイト単位のデータを格納するために、オブジェクトストレージは従来のファイルシステムのディレクトリツリーを、より単純なオブジェクト名 => データモデルに置き換えています。リモートアクセスを有効にするために、オブジェクトの操作は通常、(低速な)HTTP REST 操作として提供されます。

Spark は、Hadoop に実装されているか、インフラストラクチャサプライヤーによって提供されるファイルシステムコネクタを介して、オブジェクトストレージ内のデータを読み書きできます。これらのコネクタにより、オブジェクトストレージはディレクトリやファイル、リスト、削除、名前変更などの従来の操作を備えたファイルシステムのように見えます。

重要: クラウドオブジェクトストレージは実際のファイルシステムではありません

ストレージはファイルシステムのように見えますが、基盤は依然としてオブジェクトストレージであり、その違いは重要です

これらは、明示的に指定されている場合を除き、HDFS などのクラスタファイルシステムの直接的な代替として使用できません。

主な違いは次のとおりです。

これが Spark にどのように影響するか?

  1. データの読み書きは、通常のファイルシステムを使用する場合よりも大幅に遅くなる可能性があります。
  2. 一部のディレクトリ構造は、クエリ分割計算中にスキャンするのに非常に非効率的な場合があります。
  3. Spark が通常、RDD、DataFrame、または Dataset を保存するときに作業をコミットするために使用する名前変更ベースのアルゴリズムは、低速で信頼性が低い可能性があります。

これらの理由から、オブジェクトストレージをクエリの直接的な宛先、またはクエリのチェーンの中間ストレージとして使用することは常に安全とは限りません。使用できるかどうかは、オブジェクトストレージとそのコネクタのドキュメントを参照してください。

一貫性

2021 年現在、Amazon (S3)、Google Cloud (GCS)、Microsoft (Azure Storage、ADLS Gen1、ADLS Gen2) のオブジェクトストレージはすべて一貫性があります

これは、ファイルが書き込まれたり更新されたりするとすぐに、他のプロセスによってリスト、表示、および開くことができ、最新のバージョンが取得されることを意味します。これは AWS S3 の既知の問題であり、特にオブジェクトが作成される前に実行された HEAD リクエストの 404 キャッシュに関連していました。

それでも、ストレージコネクタのいずれも、ストリームがオブジェクトを読み取っている間に上書きされたオブジェクトをクライアントがどのように処理するかについての保証を提供していません。古いファイルは安全に読み取ることができると想定したり、変更が表示されるまでに限られた時間があると想定したり、ファイルの読み取り中に上書きされた場合にクライアントが単純に失敗しないことを想定したりしないでください。

このため、他のクライアントが積極的に読み取っている可能性のあるファイルの上書きは避けてください。

その他のオブジェクトストレージは一貫性がない

これにはOpenStack Swiftが含まれます。

このようなストレージは、常に作業の宛先として安全に使用できるとは限りません。各ストレージの特定のドキュメントを参照してください。

インストール

クラスパスに関連ライブラリがあり、有効な資格情報を使用して Spark が設定されている場合、オブジェクトはデータへのパスとしてその URL を使用して読み書きできます。たとえば、sparkContext.textFile("s3a://landsat-pds/scene_list.gz")は、s3a コネクタを使用して、S3 に格納されているscene_list.gzファイルの RDD を作成します。

アプリケーションのクラスパスに関連ライブラリを追加するには、hadoop-cloudモジュールとその依存関係を含めます。

Maven では、選択した Spark のバージョンにspark.versionが設定されていると仮定して、次のものをpom.xmlファイルに追加します。

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.12</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

Apache Spark ベースの商用製品は、一般的にクラウドインフラストラクチャとの通信のためのクラスパスを直接設定するため、このモジュールは必要ない場合があります。

認証

Spark ジョブは、オブジェクトストレージ内のデータにアクセスするために、オブジェクトストレージで認証を行う必要があります。

  1. Spark がクラウドインフラストラクチャで実行されている場合、資格情報は通常自動的に設定されます。
  2. spark-submitは、AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN環境変数を読み取り、Amazon S3 のs3nおよびs3aコネクタの関連認証オプションを設定します。
  3. Hadoop クラスタでは、設定をcore-site.xmlファイルに設定できます。
  4. 認証の詳細を手動でspark-defaults.confの Spark 設定に追加できます。
  5. あるいは、アプリケーションのSparkContextを設定するために使用されるSparkConfインスタンスでプログラムによって設定することもできます。

重要: 特に公開リポジトリには、認証シークレットをソースコードリポジトリにチェックインしないでください。

関連する設定とセキュリティオプションについては、Hadoop のドキュメントを参照してください。

設定

各クラウドコネクタには独自の設定パラメータがあります。こちらも関連ドキュメントを参照してください。

名前変更ベースのコミットが安全なオブジェクトストレージの一貫性モデルについては、パフォーマンスにはFileOutputCommitter v2 アルゴリズムを、安全のためには v1 を使用します。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

これは、バージョン 1 のアルゴリズムよりも、ジョブの最後に名前変更する回数が少なくなります。rename()を使用してファイルをコミットするため、オブジェクトストレージに一貫性のあるメタデータ/リストがない場合は安全に使用できません。

コミッタは、一時ファイルをクリーンアップするときのエラーを無視するように設定することもできます。これにより、一時的なネットワーク問題がジョブの失敗にエスカレートするリスクを軽減できます。

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

元の v1 コミットアルゴリズムは、成功したタスクの出力をジョブ試行ディレクトリに名前変更し、ジョブコミットフェーズ中にそのディレクトリ内のすべてのファイルを最終的な宛先に名前変更します。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

Amazon S3 で模倣された名前変更のパフォーマンスが遅いことから、このアルゴリズムは非常に遅くなります。これに対する推奨される解決策は、S3「ゼロ名前変更」コミッタに切り替えることです(下記参照)。

参考までに、ディレクトリの名前変更時のさまざまなストレージとコネクタのパフォーマンスと安全性の特性を示します。

ストレージ コネクタ ディレクトリの名前変更の安全性 名前変更のパフォーマンス
Amazon S3 s3a 安全でない O(データ)
Azure Storage wasb 安全 O(ファイル)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(ファイル)
  1. 一時ファイルの保存で料金が発生する可能性があるため、定期的に"_temporary"という名前のディレクトリを削除します。
  2. AWS S3 の場合、保留中のマルチパートアップロードの期間に制限を設定します。これにより、未完了のアップロードからの請求を回避できます。
  3. Google Cloud の場合、ディレクトリの名前変更はファイル単位です。v2 コミッタの使用を検討し、ファイル名を含むべき等な出力のみを生成するコードを記述してください。これは v1 コミッタよりも安全ではなく、高速です。

Parquet I/O 設定

Parquet データを処理する場合の最適なパフォーマンスを得るには、次の設定を使用します。

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

これにより、クエリ中に読み取られるデータの量が最小限に抑えられます。

ORC I/O 設定

ORC データを処理する場合の最適なパフォーマンスを得るには、次の設定を使用します。

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

同様に、これにより、クエリ中に読み取られるデータの量が最小限に抑えられます。

Spark ストリーミングとオブジェクトストレージ

Spark ストリーミングは、StreamingContext.textFileStream()を呼び出してストア内のパスを監視するFileInputDStreamを作成することにより、オブジェクトストレージに追加されたファイルを監視できます。

  1. 新しいファイルをスキャンする時間は、新しいファイルの数ではなく、パスの下にあるファイルの数に比例するため、低速な操作になる可能性があります。ウィンドウのサイズは、これに対処するように設定する必要があります。

  2. ファイルは、完全に書き込まれた場合にのみオブジェクトストレージに表示されます。ファイルが書き込まれている間もファイルが取得されないようにするには、書き込み後名前変更のワークフローは必要ありません。アプリケーションは、監視対象のディレクトリに直接書き込むことができます。

  3. FileContextBasedCheckpointFileManagerと呼ばれるデフォルトのチェックポイントファイルマネージャの場合、ストリームは高速でアトミックなrename()操作を実装するストアにのみチェックポイントする必要があります。それ以外の場合は、チェックポイントが遅く、信頼性が低くなる可能性があります。S3A コネクタを使用して Hadoop 3.3.1 以降の AWS S3 では、アボート可能なストリームベースのチェックポイントファイルマネージャを使用できます(spark.sql.streaming.checkpointFileManagerClass設定をorg.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManagerに設定することにより)。これにより、低速な名前変更がなくなります。この場合、チェックポイントデータの破損につながる可能性があるため、複数のクエリが並列で実行される場合のチェックポイントの場所の再利用を避けるために特に注意する必要があります。

クラウドストレージへの作業の安全かつ迅速なコミット

前述のように、コミットによる名前変更は、最終的な一貫性を示すオブジェクトストレージ(例:S3)では危険であり、従来のファイルシステムの名前変更よりも遅いことがよくあります。

一部のオブジェクトストレージコネクタは、名前変更を使用せずにタスクとジョブをコミットするカスタムコミッタを提供します。

Hadoop S3A コミッタ

Hadoop 3.1 以降でビルドされた Spark のバージョンでは、hadoop-aws JAR に、s3a コネクタを介してアクセスされる S3 ストレージに安全に使用できるコミッタが含まれています。

ストア上のテンポラリディレクトリにデータを書き込んでから名前を変更する代わりに、これらのコミッタはファイルを最終的な宛先に書き込みますが、大規模な「マルチパート」アップロードを可視化する最終的なPOSTコマンドは発行しません。これらの操作は、ジョブコミット自体まで延期されます。その結果、タスクとジョブのコミットが大幅に高速化され、タスクの失敗が結果に影響を与えなくなります。

S3Aコミッタに切り替えるには、Hadoop 3.1以降でビルドされたSparkのバージョンを使用し、以下のオプションでコミッタを切り替えます。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

Sparkでサポートされている最も一般的な形式でテストされています。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

これらのコミッタの詳細については、最新のHadoopドキュメント(S3Aコミッタの詳細についてはS3への作業のコミット:S3Aコミッタの使用を参照)をご覧ください。

注:使用するコミッタによっては、3.3.1より前のHadoopバージョンでは進行中の統計が過小報告される場合があります。

Amazon EMR: EMRFS S3 最適化コミッタ

Amazon EMRには、Parquetデータ用の独自のS3対応コミッタがあります。使用方法については、EMRFS S3最適化コミッタを参照してください。

実装とパフォーマンスの詳細については、[「EMRFS S3最適化コミッタを使用したApache Parquet形式でのApache Spark書き込みパフォーマンスの向上」](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)を参照してください。

Azure と Google Cloud Storage: MapReduce 中間マニフェストコミッタ

2022年9月以降に出荷されたhadoop-mapreduce-core JARのバージョン(3.3.5以降)には、Azure ADLS Generation 2とGoogle Cloud Storageのパフォーマンスと耐障害性を最適化したコミッタが含まれています。

このコミッタである「マニフェストコミッタ」は、マニフェストファイルを使用して、タスクコミッタからジョブコミッタへのディレクトリ一覧情報を伝達します。これらのマニフェストは、アトミックなディレクトリ名の変更に依存することなく、アトミックに書き込むことができます(これはGCSにはありません)。

ジョブコミッタはこれらのマニフェストを読み取り、タスク出力ディレクトリから宛先ディレクトリにファイルを直接、並列に名前変更し、オプションでレート制限を使用してIOのスロットリングを回避します。これにより、オブジェクトストアのパフォーマンスとスケーラビリティが向上します。

Azureストレージでこれを使用することは、ジョブの正確性にとって重要ではありません。従来のFileOutputCommitterはそこで安全です。ただし、この新しいコミッタは、深く幅広いディレクトリツリーを持つ大規模なジョブの方がスケールします。

Google GCSはアトミックなディレクトリ名の変更をサポートしていないため、可能な場合はマニフェストコミッタを使用する必要があります。

このコミッタは「動的パーティション上書き」をサポートしています(下記参照)。

このコミッタの可用性と使用方法の詳細については、使用しているHadoopリリースのHadoopドキュメントを参照してください。

Hadoop 3.3.4以前では使用できません。

IBM Cloud Object Storage: Stocator

IBMは、IBM Cloud Object StorageとOpenStack Swift用のStocator出力コミッタを提供しています。

ソース、ドキュメント、リリースはhttps://github.com/CODAIT/stocatorにあります。

クラウドコミッタとINSERT OVERWRITE TABLE

Sparkには「動的パーティション上書き」という機能があります。テーブルを更新でき、新しいデータが追加されたパーティションのみの内容が置き換えられます。

これは、INSERT OVERWRITE TABLE形式のSQLステートメントで使用され、Datasetが「overwrite」モードで書き込まれる場合にも使用されます。

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

この機能はファイルの名前変更を使用し、コミッタとファイルシステムの両方に特定の要件があります。

  1. コミッタの作業ディレクトリは、宛先ファイルシステムに存在する必要があります。
  2. ターゲットファイルシステムは、ファイルの名前変更を効率的にサポートする必要があります。

これらの条件は、S3AコミッタとAWS S3ストレージでは満たされません

他のクラウドストアのコミッタは、この機能をサポートしている場合があり、Sparkに互換性があると宣言します。Hadoopコミッタを介してデータ書き込み時に動的パーティション上書きが必要な場合、元のFileOutputCommitterが使用されている場合は、Sparkは常にこれを許可します。他のコミッタについては、インスタンス化後、Sparkは互換性の宣言を調査し、互換性があると状態を宣言している場合は操作を許可します。

コミッタが互換性がない場合、PathOutputCommitter does not support dynamicPartitionOverwriteというエラーメッセージが表示されて操作は失敗します。

ターゲットファイルシステムに互換性のあるコミッタがない限り、唯一の解決策は、データストレージにクラウドフレンドリーな形式を使用することです。

参考資料

Apacheとクラウドプロバイダの両方の標準コネクタに関するドキュメントを以下に示します。

クラウドコミッタの問題とHive互換ソリューション