クイックスタート

このチュートリアルでは、Spark の使用方法を簡単に紹介します。まず、Spark のインタラクティブシェル(Python または Scala)を通じて API を紹介し、次に Java、Scala、Python でアプリケーションを作成する方法を示します。

このガイドを進めるには、まず Spark ウェブサイトから Spark のパッケージリリースをダウンロードしてください。HDFS を使用しないため、任意のバージョンの Hadoop 用パッケージをダウンロードできます。

Spark 2.0 より前は、Spark の主要なプログラミングインターフェイスは Resilient Distributed Dataset (RDD) でした。Spark 2.0 以降、RDD は Dataset に置き換えられました。Dataset は RDD のように型安全ですが、内部にはより豊富な最適化が施されています。RDD インターフェイスは引き続きサポートされており、RDD プログラミングガイドで詳細なリファレンスを確認できます。しかし、パフォーマンスが RDD よりも優れた Dataset の使用に切り替えることを強くお勧めします。Dataset に関する詳細については、SQL プログラミングガイドを参照してください。

Spark シェルによるインタラクティブ分析

基本

Spark のシェルは、API を学習するための簡単な方法と、インタラクティブにデータを分析するための強力なツールを提供します。Scala(Java VM 上で動作するため、既存の Java ライブラリを使用するのに適しています)または Python で利用できます。Spark ディレクトリで以下を実行して起動してください。

./bin/pyspark

または、PySpark が現在の環境に pip でインストールされている場合

pyspark

Spark の主な抽象化は、Dataset と呼ばれるアイテムの分散コレクションです。Dataset は Hadoop InputFormats(HDFS ファイルなど)から作成したり、他の Dataset を変換したりして作成できます。Python の動的な性質により、Python では Dataset が型安全である必要はありません。その結果、Python のすべての Dataset は Dataset[Row] となり、Pandas や R のデータフレームの概念と一貫性を保つために `DataFrame` と呼びます。Spark ソースディレクトリの README ファイルのテキストから新しい DataFrame を作成してみましょう。

>>> textFile = spark.read.text("README.md")

some actions を呼び出すことで DataFrame から直接値を取得したり、DataFrame を変換して新しい DataFrame を取得したりできます。詳細については、*API ドキュメント* を参照してください。

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

それでは、この DataFrame を新しい DataFrame に変換してみましょう。`filter` を呼び出すと、ファイル内の行のサブセットを含む新しい DataFrame が返されます。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

変換とアクションを連鎖させることができます。

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15
./bin/spark-shell

Spark の主な抽象化は、Dataset と呼ばれるアイテムの分散コレクションです。Dataset は Hadoop InputFormats(HDFS ファイルなど)から作成したり、他の Dataset を変換したりして作成できます。Spark ソースディレクトリの README ファイルのテキストから新しい Dataset を作成してみましょう。

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

some actions を呼び出すことで Dataset から直接値を取得したり、Dataset を変換して新しい Dataset を取得したりできます。詳細については、*API ドキュメント* を参照してください。

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

それでは、この Dataset を新しい Dataset に変換してみましょう。`filter` を呼び出すと、ファイル内のアイテムのサブセットを含む新しい Dataset が返されます。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

変換とアクションを連鎖させることができます。

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

データセット操作の詳細

Dataset のアクションと変換は、より複雑な計算に使用できます。最も単語数の多い行を見つけたいとしましょう。

>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]

これは、まず行を整数値にマッピングし、「numWords」というエイリアスを付けて新しい DataFrame を作成します。その DataFrame で `agg` を呼び出して、最大の単語数を検索します。`select` と `agg` の引数はどちらも *Column* であり、`df.colName` を使用して DataFrame から列を取得できます。また、pyspark.sql.functions をインポートすることもできます。これにより、古い列から新しい列を構築するための便利な関数が多数提供されます。

一般的なデータフローパターンは MapReduce であり、Hadoop によって普及しました。Spark は MapReduce フローを簡単に実装できます。

>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

ここでは、`select` の `explode` 関数を使用して、行の Dataset を単語の Dataset に変換し、次に `groupBy` と `count` を組み合わせて、ファイル内の単語ごとのカウントを 2 列(「word」と「count」)の DataFrame として計算します。シェルで単語カウントを収集するには、`collect` を呼び出すことができます。

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15

これは、まず行を整数値にマッピングして新しい Dataset を作成します。その Dataset で `reduce` を呼び出して、最大の単語数を検索します。`map` と `reduce` の引数は Scala の関数リテラル(クロージャ)であり、任意の言語機能または Scala/Java ライブラリを使用できます。たとえば、 elsewhere で宣言された関数を簡単に呼び出すことができます。このコードを理解しやすくするために、`Math.max()` 関数を使用します。

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一般的なデータフローパターンは MapReduce であり、Hadoop によって普及しました。Spark は MapReduce フローを簡単に実装できます。

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

ここでは、`flatMap` を呼び出して行の Dataset を単語の Dataset に変換し、次に `groupByKey` と `count` を組み合わせて、ファイル内の単語ごとのカウントを(String, Long)ペアの Dataset として計算します。シェルで単語カウントを収集するには、`collect` を呼び出すことができます。

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

キャッシュ

Spark は、データセットをクラスター全体でインメモリキャッシュにプルすることもサポートしています。これは、小さな「ホット」データセットのクエリや、PageRank のような反復アルゴリズムの実行など、データが繰り返しアクセスされる場合に非常に役立ちます。簡単な例として、`linesWithSpark` データセットをキャッシュするようにマークしましょう。

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

100 行のテキストファイルを探索してキャッシュするために Spark を使用するのは馬鹿げているように思えるかもしれません。興味深いのは、これらの同じ関数を非常に大きなデータセットに使用できることであり、それらは数十または数百のノードに分散されている場合でも同様です。また、RDD プログラミングガイドで説明されているように、`bin/pyspark` をクラスターに接続してインタラクティブに行うこともできます。

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

100 行のテキストファイルを探索してキャッシュするために Spark を使用するのは馬鹿げているように思えるかもしれません。興味深いのは、これらの同じ関数を非常に大きなデータセットに使用できることであり、それらは数十または数百のノードに分散されている場合でも同様です。また、RDD プログラミングガイドで説明されているように、`bin/spark-shell` をクラスターに接続してインタラクティブに行うこともできます。

スタンドアロンアプリケーション

Spark API を使用してスタンドアロンアプリケーションを記述すると仮定します。Scala(sbt を使用)、Java(Maven を使用)、Python(pip を使用)の簡単なアプリケーションを順に説明します。

次に、Python API(PySpark)を使用したアプリケーションの作成方法を示します。

パッケージ化された PySpark アプリケーションまたはライブラリをビルドしている場合は、setup.py ファイルに次のように追加できます。

    install_requires=[
        'pyspark==4.0.0'
    ]

例として、簡単な Spark アプリケーション `SimpleApp.py` を作成します。

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

このプログラムは、テキストファイル内の 'a' を含む行数と 'b' を含む行数をカウントするだけです。YOUR_SPARK_HOME を Spark がインストールされている場所に置き換える必要があることに注意してください。Scala および Java の例と同様に、SparkSession を使用して Dataset を作成します。カスタムクラスまたはサードパーティライブラリを使用するアプリケーションの場合、パッケージ化された .zip ファイルを介して `spark-submit` にコード依存関係を追加することもできます(詳細については `spark-submit --help` を参照してください)。`SimpleApp` は単純なため、コード依存関係を指定する必要はありません。

`bin/spark-submit` スクリプトを使用してこのアプリケーションを実行できます。

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master "local[4]" \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

PySpark が環境に pip インストールされている場合(例: `pip install pyspark`)、通常の Python インタープリタでアプリケーションを実行するか、提供されている `spark-submit` を必要に応じて使用できます。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

非常に簡単な Spark アプリケーションを Scala で作成します。実際、非常に簡単なので `SimpleApp.scala` と名付けました。

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

アプリケーションは `scala.App` を拡張するのではなく、`main()` メソッドを定義する必要があることに注意してください。`scala.App` のサブクラスは正しく機能しない場合があります。

このプログラムは、Spark README の 'a' を含む行数と 'b' を含む行数をカウントするだけです。YOUR_SPARK_HOME を Spark がインストールされている場所に置き換える必要があることに注意してください。Spark シェルを使用した以前の例とは異なり、Spark シェルは独自の SparkSession を初期化しますが、ここではプログラムの一部として SparkSession を初期化します。

`SparkSession.builder` を呼び出して `SparkSession` を構築し、アプリケーション名を設定し、最後に `getOrCreate` を呼び出して `SparkSession` インスタンスを取得します。

アプリケーションは Spark API に依存しているため、sbt 設定ファイル `build.sbt` も含めます。これは、Spark が依存関係であることを示します。このファイルは、Spark が依存するリポジトリも追加します。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.13.16"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "4.0.0"

sbt が正しく機能するには、`SimpleApp.scala` と `build.sbt` を通常のディレクトリ構造に従って配置する必要があります。これが整ったら、アプリケーションのコードを含む JAR パッケージを作成し、`spark-submit` スクリプトを使用してプログラムを実行できます。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.13/simple-project_2.13-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master "local[4]" \
  target/scala-2.13/simple-project_2.13-1.0.jar
...
Lines with a: 46, Lines with b: 23

この例では Maven を使用してアプリケーション JAR をコンパイルしますが、同様のビルドシステムでも機能します。

非常に簡単な Spark アプリケーション `SimpleApp.java` を作成します。

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

このプログラムは、Spark README の 'a' を含む行数と 'b' を含む行数をカウントするだけです。YOUR_SPARK_HOME を Spark がインストールされている場所に置き換える必要があることに注意してください。Spark シェルを使用した以前の例とは異なり、Spark シェルは独自の SparkSession を初期化しますが、ここではプログラムの一部として SparkSession を初期化します。

プログラムをビルドするために、Maven の `pom.xml` ファイルも記述し、Spark を依存関係としてリストします。Spark アーティファクトには Scala バージョンがタグ付けされていることに注意してください。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>4.0.0</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

これらのファイルを標準的な Maven ディレクトリ構造に従って配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

これで、Maven を使用してアプリケーションをパッケージ化し、`./bin/spark-submit` で実行できます。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master "local[4]" \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Conda や pip などの他の依存関係管理ツールも、カスタムクラスやサードパーティライブラリに使用できます。「Python パッケージ管理」も参照してください。

次へ進む

初めての Spark アプリケーションの実行、おめでとうございます!

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R