クイックスタート

このチュートリアルでは、Sparkの使い方を簡単に紹介します。まず、Sparkのインタラクティブシェル(PythonまたはScala)を通してAPIを紹介します。次に、Java、Scala、およびPythonでアプリケーションを作成する方法を説明します。

このガイドに従うには、まずSparkのウェブサイトからSparkのパッケージリリースをダウンロードしてください。HDFSを使用しないため、Hadoopのどのバージョン向けのパッケージでもダウンロードできます。

Spark 2.0より前は、Sparkの主要なプログラミングインターフェースはResilient Distributed Dataset(RDD)でした。Spark 2.0以降、RDDはDatasetに置き換えられました。DatasetはRDDのように強型付けされていますが、内部的にはより高度な最適化が行われています。RDDインターフェースはまだサポートされており、RDDプログラミングガイドでより詳細なリファレンスを参照できます。ただし、RDDよりもパフォーマンスが優れているDatasetを使用することを強くお勧めします。Datasetの詳細については、SQLプログラミングガイドを参照してください。

Sparkシェルを使ったインタラクティブ分析

基本

Sparkのシェルは、APIを学ぶための簡単な方法と、データをインタラクティブに分析するための強力なツールを提供します。Scala(Java VM上で動作するため、既存のJavaライブラリを使用するのに適しています)またはPythonのいずれかで利用できます。Sparkディレクトリで次のコマンドを実行して起動します。

./bin/pyspark

または、現在の環境でPySparkがpipでインストールされている場合

pyspark

Sparkの主要な抽象化は、Datasetと呼ばれるアイテムの分散コレクションです。Datasetは、Hadoop InputFormats(HDFSファイルなど)から、または他のDatasetを変換することによって作成できます。Pythonの動的な性質により、PythonではDatasetを強型付けする必要はありません。そのため、PythonのすべてのDatasetはDataset[Row]であり、PandasとRのデータフレームの概念と一致させるために、DataFrameと呼びます。SparkソースディレクトリのREADMEファイルのテキストから新しいDataFrameを作成してみましょう。

>>> textFile = spark.read.text("README.md")

DataFrameから値を取得するには、いくつかのアクションを呼び出すか、DataFrameを変換して新しいDataFrameを取得します。詳細については、APIドキュメントを参照してください。

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

それでは、このDataFrameを新しいDataFrameに変換してみましょう。filterを呼び出して、ファイル内の行のサブセットを含む新しいDataFrameを返します。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

変換とアクションを連結できます。

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15
./bin/spark-shell

Sparkの主要な抽象化は、Datasetと呼ばれるアイテムの分散コレクションです。Datasetは、Hadoop InputFormats(HDFSファイルなど)から、または他のDatasetを変換することによって作成できます。SparkソースディレクトリのREADMEファイルのテキストから新しいDatasetを作成してみましょう。

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

Datasetから値を取得するには、いくつかのアクションを呼び出すか、Datasetを変換して新しいDatasetを取得します。詳細については、APIドキュメントを参照してください。

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

それでは、このDatasetを新しいDatasetに変換してみましょう。filterを呼び出して、ファイル内のアイテムのサブセットを含む新しいDatasetを返します。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

変換とアクションを連結できます。

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Dataset操作の詳細

Datasetのアクションと変換は、より複雑な計算に使用できます。たとえば、単語数が最も多い行を見つけたいとします。

>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]

これは最初に、行を整数値にマッピングし、それを「numWords」としてエイリアスを付け、新しいDataFrameを作成します。そのDataFrameに対してaggが呼び出され、最大の単語数が検索されます。selectaggの引数はどちらもColumnです。df.colNameを使用してDataFrameから列を取得できます。また、pyspark.sql.functionsをインポートすることもできます。これにより、古いColumnから新しいColumnを構築するための便利な関数が多数提供されます。

一般的なデータフローパターンは、Hadoopで普及したMapReduceです。SparkはMapReduceフローを簡単に実装できます。

>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

ここでは、selectexplode関数を使用して、行のDatasetを単語のDatasetに変換し、groupBycountを組み合わせて、ファイル内の単語ごとのカウントを「word」と「count」の2つの列を持つDataFrameとして計算します。シェルで単語数を収集するには、collectを呼び出すことができます。

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15

これは最初に、行を整数値にマッピングし、新しいDatasetを作成します。そのDatasetに対してreduceが呼び出され、最大の単語数が検索されます。mapreduceの引数はScala関数リテラル(クロージャ)であり、任意の言語機能またはScala/Javaライブラリを使用できます。たとえば、他の場所で宣言された関数を簡単に呼び出すことができます。このコードを理解しやすくするために、Math.max()関数を使用します。

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一般的なデータフローパターンは、Hadoopで普及したMapReduceです。SparkはMapReduceフローを簡単に実装できます。

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

ここでは、flatMapを呼び出して、行のDatasetを単語のDatasetに変換し、groupByKeycountを組み合わせて、ファイル内の単語ごとのカウントを(String, Long)ペアのDatasetとして計算します。シェルで単語数を収集するには、collectを呼び出すことができます。

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

キャッシング

Sparkは、データセットをクラスタ全体のインメモリキャッシュにプルすることもサポートしています。これは、小さな「ホット」データセットのクエリや、PageRankなどの反復アルゴリズムの実行など、データに繰り返しアクセスする場合に非常に役立ちます。簡単な例として、linesWithSparkデータセットをキャッシュするようにマークしてみましょう。

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

100行のテキストファイルを探索してキャッシュするためにSparkを使用するのはばかげているように思えるかもしれません。興味深いのは、これらの同じ関数を、数十または数百のノードに分散されている場合でも、非常に大きなデータセットに使用できることです。RDDプログラミングガイドで説明されているように、bin/pysparkをクラスタに接続することによって、インタラクティブにこれを行うこともできます。

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

100行のテキストファイルを探索してキャッシュするためにSparkを使用するのはばかげているように思えるかもしれません。興味深いのは、これらの同じ関数を、数十または数百のノードに分散されている場合でも、非常に大きなデータセットに使用できることです。RDDプログラミングガイドで説明されているように、bin/spark-shellをクラスタに接続することによって、インタラクティブにこれを行うこともできます。

自己完結型アプリケーション

Spark APIを使用して自己完結型アプリケーションを作成したいとします。Scala(sbtを使用)、Java(Mavenを使用)、およびPython(pipを使用)の簡単なアプリケーションについて説明します。

それでは、Python API(PySpark)を使用してアプリケーションを作成する方法を説明します。

パッケージ化されたPySparkアプリケーションまたはライブラリを構築している場合は、setup.pyファイルに次のように追加できます。

    install_requires=[
        'pyspark==3.5.1'
    ]

例として、簡単なSparkアプリケーションSimpleApp.pyを作成します。

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

このプログラムは、テキストファイル内の「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。ScalaとJavaの例と同様に、SparkSessionを使用してDatasetを作成します。カスタムクラスまたはサードパーティライブラリを使用するアプリケーションの場合、spark-submitにコードの依存関係を--py-files引数を通じて追加することもできます。.zipファイルにパッケージ化することでこれを実現できます(詳細については、spark-submit --helpを参照してください)。SimpleAppは、コードの依存関係を指定する必要がないほどシンプルです。

このアプリケーションは、bin/spark-submitスクリプトを使用して実行できます。

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

PySpark pipが環境にインストールされている場合(例:pip install pyspark)、通常のPythonインタープリターでアプリケーションを実行するか、必要に応じて提供されている「spark-submit」を使用できます。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

Scalaで非常に簡単なSparkアプリケーションを作成します。実際、SimpleApp.scalaという名前が付けられています。

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

アプリケーションは、scala.Appを拡張する代わりに、main()メソッドを定義する必要があることに注意してください。scala.Appのサブクラスは正しく機能しない場合があります。

このプログラムは、SparkのREADMEに「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。独自のSparkSessionを初期化するSparkシェルを使用した以前の例とは異なり、プログラムの一部としてSparkSessionを初期化します。

SparkSession.builderを呼び出してSparkSessionを構築し、アプリケーション名を設定し、最後にgetOrCreateを呼び出してSparkSessionインスタンスを取得します。

私たちのアプリケーションはSpark APIに依存しているため、Sparkが依存関係であることを説明するsbt設定ファイルbuild.sbtも含めます。このファイルは、Sparkが依存するリポジトリも追加します。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"

sbtが正しく動作するためには、SimpleApp.scalabuild.sbtを典型的なディレクトリ構造に従って配置する必要があります。それが整ったら、アプリケーションのコードを含むJARパッケージを作成し、spark-submitスクリプトを使用してプログラムを実行できます。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

この例では、Mavenを使用してアプリケーションJARをコンパイルしますが、同様のビルドシステムであればどれでも機能します。

非常に簡単なSparkアプリケーションSimpleApp.javaを作成します。

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

このプログラムは、SparkのREADMEに「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。独自のSparkSessionを初期化するSparkシェルを使用した以前の例とは異なり、プログラムの一部としてSparkSessionを初期化します。

プログラムをビルドするために、Sparkを依存関係としてリストするMaven pom.xmlファイルも記述します。SparkアーティファクトはScalaバージョンでタグ付けされていることに注意してください。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.5.1</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

これらのファイルを標準のMavenディレクトリ構造に従って配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

これで、Mavenを使用してアプリケーションをパッケージ化し、./bin/spark-submitで実行できます。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Condaやpipなどの他の依存関係管理ツールも、カスタムクラスやサードパーティライブラリに使用できます。Pythonパッケージ管理も参照してください。

今後の展望

最初のSparkアプリケーションの実行おめでとうございます!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R