クイックスタート
このチュートリアルでは、Sparkの使い方を簡単に紹介します。まず、Sparkのインタラクティブシェル(PythonまたはScala)を通してAPIを紹介します。次に、Java、Scala、およびPythonでアプリケーションを作成する方法を説明します。
このガイドに従うには、まずSparkのウェブサイトからSparkのパッケージリリースをダウンロードしてください。HDFSを使用しないため、Hadoopのどのバージョン向けのパッケージでもダウンロードできます。
Spark 2.0より前は、Sparkの主要なプログラミングインターフェースはResilient Distributed Dataset(RDD)でした。Spark 2.0以降、RDDはDatasetに置き換えられました。DatasetはRDDのように強型付けされていますが、内部的にはより高度な最適化が行われています。RDDインターフェースはまだサポートされており、RDDプログラミングガイドでより詳細なリファレンスを参照できます。ただし、RDDよりもパフォーマンスが優れているDatasetを使用することを強くお勧めします。Datasetの詳細については、SQLプログラミングガイドを参照してください。
Sparkシェルを使ったインタラクティブ分析
基本
Sparkのシェルは、APIを学ぶための簡単な方法と、データをインタラクティブに分析するための強力なツールを提供します。Scala(Java VM上で動作するため、既存のJavaライブラリを使用するのに適しています)またはPythonのいずれかで利用できます。Sparkディレクトリで次のコマンドを実行して起動します。
./bin/pyspark
または、現在の環境でPySparkがpipでインストールされている場合
pyspark
Sparkの主要な抽象化は、Datasetと呼ばれるアイテムの分散コレクションです。Datasetは、Hadoop InputFormats(HDFSファイルなど)から、または他のDatasetを変換することによって作成できます。Pythonの動的な性質により、PythonではDatasetを強型付けする必要はありません。そのため、PythonのすべてのDatasetはDataset[Row]であり、PandasとRのデータフレームの概念と一致させるために、DataFrame
と呼びます。SparkソースディレクトリのREADMEファイルのテキストから新しいDataFrameを作成してみましょう。
>>> textFile = spark.read.text("README.md")
DataFrameから値を取得するには、いくつかのアクションを呼び出すか、DataFrameを変換して新しいDataFrameを取得します。詳細については、APIドキュメントを参照してください。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
それでは、このDataFrameを新しいDataFrameに変換してみましょう。filter
を呼び出して、ファイル内の行のサブセットを含む新しいDataFrameを返します。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
変換とアクションを連結できます。
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
./bin/spark-shell
Sparkの主要な抽象化は、Datasetと呼ばれるアイテムの分散コレクションです。Datasetは、Hadoop InputFormats(HDFSファイルなど)から、または他のDatasetを変換することによって作成できます。SparkソースディレクトリのREADMEファイルのテキストから新しいDatasetを作成してみましょう。
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
Datasetから値を取得するには、いくつかのアクションを呼び出すか、Datasetを変換して新しいDatasetを取得します。詳細については、APIドキュメントを参照してください。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
それでは、このDatasetを新しいDatasetに変換してみましょう。filter
を呼び出して、ファイル内のアイテムのサブセットを含む新しいDatasetを返します。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
変換とアクションを連結できます。
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
Dataset操作の詳細
Datasetのアクションと変換は、より複雑な計算に使用できます。たとえば、単語数が最も多い行を見つけたいとします。
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
これは最初に、行を整数値にマッピングし、それを「numWords」としてエイリアスを付け、新しいDataFrameを作成します。そのDataFrameに対してagg
が呼び出され、最大の単語数が検索されます。select
とagg
の引数はどちらもColumnです。df.colName
を使用してDataFrameから列を取得できます。また、pyspark.sql.functionsをインポートすることもできます。これにより、古いColumnから新しいColumnを構築するための便利な関数が多数提供されます。
一般的なデータフローパターンは、Hadoopで普及したMapReduceです。SparkはMapReduceフローを簡単に実装できます。
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
ここでは、select
でexplode
関数を使用して、行のDatasetを単語のDatasetに変換し、groupBy
とcount
を組み合わせて、ファイル内の単語ごとのカウントを「word」と「count」の2つの列を持つDataFrameとして計算します。シェルで単語数を収集するには、collect
を呼び出すことができます。
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15
これは最初に、行を整数値にマッピングし、新しいDatasetを作成します。そのDatasetに対してreduce
が呼び出され、最大の単語数が検索されます。map
とreduce
の引数はScala関数リテラル(クロージャ)であり、任意の言語機能またはScala/Javaライブラリを使用できます。たとえば、他の場所で宣言された関数を簡単に呼び出すことができます。このコードを理解しやすくするために、Math.max()
関数を使用します。
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一般的なデータフローパターンは、Hadoopで普及したMapReduceです。SparkはMapReduceフローを簡単に実装できます。
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
ここでは、flatMap
を呼び出して、行のDatasetを単語のDatasetに変換し、groupByKey
とcount
を組み合わせて、ファイル内の単語ごとのカウントを(String, Long)ペアのDatasetとして計算します。シェルで単語数を収集するには、collect
を呼び出すことができます。
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
キャッシング
Sparkは、データセットをクラスタ全体のインメモリキャッシュにプルすることもサポートしています。これは、小さな「ホット」データセットのクエリや、PageRankなどの反復アルゴリズムの実行など、データに繰り返しアクセスする場合に非常に役立ちます。簡単な例として、linesWithSpark
データセットをキャッシュするようにマークしてみましょう。
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
100行のテキストファイルを探索してキャッシュするためにSparkを使用するのはばかげているように思えるかもしれません。興味深いのは、これらの同じ関数を、数十または数百のノードに分散されている場合でも、非常に大きなデータセットに使用できることです。RDDプログラミングガイドで説明されているように、bin/pyspark
をクラスタに接続することによって、インタラクティブにこれを行うこともできます。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
100行のテキストファイルを探索してキャッシュするためにSparkを使用するのはばかげているように思えるかもしれません。興味深いのは、これらの同じ関数を、数十または数百のノードに分散されている場合でも、非常に大きなデータセットに使用できることです。RDDプログラミングガイドで説明されているように、bin/spark-shell
をクラスタに接続することによって、インタラクティブにこれを行うこともできます。
自己完結型アプリケーション
Spark APIを使用して自己完結型アプリケーションを作成したいとします。Scala(sbtを使用)、Java(Mavenを使用)、およびPython(pipを使用)の簡単なアプリケーションについて説明します。
それでは、Python API(PySpark)を使用してアプリケーションを作成する方法を説明します。
パッケージ化されたPySparkアプリケーションまたはライブラリを構築している場合は、setup.pyファイルに次のように追加できます。
install_requires=[
'pyspark==3.5.1'
]
例として、簡単なSparkアプリケーションSimpleApp.py
を作成します。
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
このプログラムは、テキストファイル内の「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。ScalaとJavaの例と同様に、SparkSessionを使用してDatasetを作成します。カスタムクラスまたはサードパーティライブラリを使用するアプリケーションの場合、spark-submit
にコードの依存関係を--py-files
引数を通じて追加することもできます。.zipファイルにパッケージ化することでこれを実現できます(詳細については、spark-submit --help
を参照してください)。SimpleApp
は、コードの依存関係を指定する必要がないほどシンプルです。
このアプリケーションは、bin/spark-submit
スクリプトを使用して実行できます。
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
PySpark pipが環境にインストールされている場合(例:pip install pyspark
)、通常のPythonインタープリターでアプリケーションを実行するか、必要に応じて提供されている「spark-submit」を使用できます。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
Scalaで非常に簡単なSparkアプリケーションを作成します。実際、SimpleApp.scala
という名前が付けられています。
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
アプリケーションは、scala.App
を拡張する代わりに、main()
メソッドを定義する必要があることに注意してください。scala.App
のサブクラスは正しく機能しない場合があります。
このプログラムは、SparkのREADMEに「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。独自のSparkSessionを初期化するSparkシェルを使用した以前の例とは異なり、プログラムの一部としてSparkSessionを初期化します。
SparkSession.builder
を呼び出してSparkSession
を構築し、アプリケーション名を設定し、最後にgetOrCreate
を呼び出してSparkSession
インスタンスを取得します。
私たちのアプリケーションはSpark APIに依存しているため、Sparkが依存関係であることを説明するsbt設定ファイルbuild.sbt
も含めます。このファイルは、Sparkが依存するリポジトリも追加します。
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
sbtが正しく動作するためには、SimpleApp.scala
とbuild.sbt
を典型的なディレクトリ構造に従って配置する必要があります。それが整ったら、アプリケーションのコードを含むJARパッケージを作成し、spark-submit
スクリプトを使用してプログラムを実行できます。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
この例では、Mavenを使用してアプリケーションJARをコンパイルしますが、同様のビルドシステムであればどれでも機能します。
非常に簡単なSparkアプリケーションSimpleApp.java
を作成します。
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
このプログラムは、SparkのREADMEに「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOMEはSparkがインストールされている場所に置き換える必要があります。独自のSparkSessionを初期化するSparkシェルを使用した以前の例とは異なり、プログラムの一部としてSparkSessionを初期化します。
プログラムをビルドするために、Sparkを依存関係としてリストするMaven pom.xml
ファイルも記述します。SparkアーティファクトはScalaバージョンでタグ付けされていることに注意してください。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
これらのファイルを標準のMavenディレクトリ構造に従って配置します。
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
これで、Mavenを使用してアプリケーションをパッケージ化し、./bin/spark-submit
で実行できます。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
Condaやpipなどの他の依存関係管理ツールも、カスタムクラスやサードパーティライブラリに使用できます。Pythonパッケージ管理も参照してください。
今後の展望
最初のSparkアプリケーションの実行おめでとうございます!
- APIの詳細な概要については、RDDプログラミングガイドとSQLプログラミングガイドを参照するか、「プログラミングガイド」メニューで他のコンポーネントをご覧ください。
- クラスタ上でアプリケーションを実行する方法については、デプロイメントの概要を参照してください。
- 最後に、Sparkには
examples
ディレクトリにいくつかのサンプルが含まれています(Scala、Java、Python、R)。 これらは以下のように実行できます。
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R