CSV ファイル

Spark SQL は、CSV 形式のファイルまたはファイルディレクトリを Spark DataFrame に読み込むための spark.read().csv("file_name") および CSV ファイルに書き込むための dataframe.write().csv("path") を提供します。option() 関数を使用して、ヘッダー、区切り文字、文字セットなどの動作を制御する読み書きの動作をカスタマイズできます。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/sql/datasource.py」にあります。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter" -> ";", "header" -> "true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」にあります。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」にあります。

データソースオプション

CSV のデータソースオプションは、以下のように設定できます。

プロパティ名デフォルト意味スコープ
sep
delimiter
, 各フィールドと値の区切り文字を設定します。この区切り文字は、1 文字以上にすることができます。 読み書き
extension csv 出力ファイルのファイル拡張子を設定します。文字に限定されます。長さは 3 でなければなりません。 書き込み
encoding
charset
UTF-8 読み込み時には、指定されたエンコーディングタイプで CSV ファイルをデコードします。書き込み時には、保存される CSV ファイルのエンコーディング(文字セット)を指定します。CSV の組み込み関数はこのオプションを無視します。 読み書き
quote " 引用符で囲まれた値のエスケープに使用される単一の文字を設定します。区切り文字が値の一部である場合があります。読み込み時に引用符を無効にしたい場合は、null ではなく空文字列を設定する必要があります。書き込み時に空文字列が設定されている場合、u0000(ヌル文字)が使用されます。 読み書き
quoteAll false すべての値が常に引用符で囲まれるかどうかを示すフラグです。デフォルトでは、引用符文字を含む値のみがエスケープされます。 書き込み
escape \ 既に引用符で囲まれた値の中にある引用符のエスケープに使用される単一の文字を設定します。 読み書き
escapeQuotes true 引用符を含む値が常に引用符で囲まれるかどうかを示すフラグです。デフォルトでは、引用符文字を含むすべての値がエスケープされます。 書き込み
comment この文字で始まる行をスキップするために使用される単一の文字を設定します。デフォルトでは無効になっています。 読み込み
header false 読み込み時には、最初の行を列名として使用します。書き込み時には、列名を最初の行として書き込みます。指定されたパスが文字列の RDD である場合、このヘッダーオプションは、ヘッダーと同じ行があればすべて削除することに注意してください。CSV の組み込み関数はこのオプションを無視します。 読み書き
inferSchema false データから入力スキーマを自動的に推論します。これには、データに対する 1 回の追加パスが必要です。CSV の組み込み関数はこのオプションを無視します。 読み込み
preferDate true スキーマ推論(inferSchema)中に、dateFormat オプションまたはデフォルトの日付形式を満たす日付を含む文字列列を Date として推論しようとします。日付とタイムスタンプの混在を含む列については、タイムスタンプ形式が指定されていない場合は TimestampType として推論しようとし、それ以外の場合は StringType として推論します。 読み込み
enforceSchema true true に設定されている場合、指定されたまたは推論されたスキーマがデータソースファイルに強制的に適用され、CSV ファイルのヘッダーは無視されます。オプションが false に設定されている場合、header オプションが true に設定されている場合に、スキーマは CSV ファイルのすべてのヘッダーに対して検証されます。スキーマのフィールド名と CSV ヘッダーの列名は、spark.sql.caseSensitive を考慮した位置によってチェックされます。デフォルト値は true ですが、誤った結果を避けるために enforceSchema オプションを無効にすることが推奨されます。CSV の組み込み関数はこのオプションを無視します。 読み込み
ignoreLeadingWhiteSpace 読み込み時:false、書き込み時:true 読み込み/書き込みされる値から先頭の空白をスキップするかどうかを示すフラグです。 読み書き
ignoreTrailingWhiteSpace 読み込み時:false、書き込み時:true 読み込み/書き込みされる値から末尾の空白をスキップするかどうかを示すフラグです。 読み書き
nullValue null 値の文字列表現を設定します。2.0.1 以降、この nullValue パラメータは、文字列型を含むすべてのサポートされる型に適用されます。 読み書き
nanValue NaN 数値ではない値の文字列表現を設定します。 読み込み
positiveInf Inf 正の無限大の値の文字列表現を設定します。 読み込み
negativeInf -Inf 負の無限大の値の文字列表現を設定します。 読み込み
dateFormat yyyy-MM-dd 日付形式を示す文字列を設定します。カスタム日付形式は、Datetime Patterns の形式に従います。これは日付型に適用されます。 読み書き
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式を示す文字列を設定します。カスタム日付形式は、Datetime Patterns の形式に従います。これはタイムスタンプ型に適用されます。 読み書き
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムゾーンなしのタイムスタンプを示す文字列を設定します。カスタム日付フォーマットは、Datetime Patterns のフォーマットに従います。これはタイムゾーンなしの timestamp 型に適用されます。注意: このデータ型を書き込んだり読み込んだりする際には、ゾーンオフセットおよびタイムゾーンコンポーネントはサポートされていません。 読み書き
enableDateTimeParsingFallback 時間パーサーポリシーにレガシー設定がある場合、またはカスタムの日付またはタイムスタンプパターンが提供されていない場合に有効になります。 値が設定されたパターンに一致しない場合に、日付とタイムスタンプの解析の互換性のある(Spark 1.x および 2.0)動作にフォールバックすることを許可します。 読み込み
maxColumns 20480 レコードが持つことができる列のハードリミットを定義します。 読み込み
maxCharsPerColumn -1 読み込まれる任意の値に許可される最大文字数を定義します。デフォルトでは、-1 で無制限の長さを意味します。 読み込み
mode PERMISSIVE 解析中の破損レコードの処理モードを許可します。以下の大文字小文字を区別しないモードをサポートしています。Spark は列プルーニングの下で CSV で必要な列のみを解析しようとすることに注意してください。したがって、破損レコードは必要なフィールドセットによって異なる場合があります。この動作は、spark.sql.csv.parser.columnPruning.enabled(デフォルトで有効)によって制御できます。
  • PERMISSIVE: 破損したレコードに遭遇した場合、破損した文字列を columnNameOfCorruptRecord で設定されたフィールドに配置し、破損したフィールドを null に設定します。破損したレコードを保持するために、ユーザーはユーザー定義スキーマに columnNameOfCorruptRecord という名前の文字列型フィールドを設定できます。スキーマにそのフィールドがない場合、解析中に破損したレコードはドロップされます。スキーマの長さよりも少ない/多いトークンを持つレコードは、CSV にとって破損したレコードではありません。スキーマの長さよりも少ないトークンを持つレコードに遭遇した場合、追加のフィールドに null を設定します。レコードがスキーマの長さよりも多いトークンを持つ場合、追加のトークンはドロップされます。
  • DROPMALFORMED: 破損したレコード全体を無視します。このモードは CSV の組み込み関数ではサポートされていません。
  • FAILFAST: 破損したレコードに遭遇した場合、例外をスローします。
読み込み
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord 設定の値) PERMISSIVE モードによって作成された破損した文字列を持つ新しいフィールドの名前を変更することを許可します。これは spark.sql.columnNameOfCorruptRecord をオーバーライドします。 読み込み
multiLine false 引用符で囲まれた値内の改行を値自体の部分として解析することにより、行が複数行にまたがることを許可します。CSV の組み込み関数はこのオプションを無視します。 読み込み
charToEscapeQuoteEscaping escape または \0 引用符文字のエスケープのエスケープに使用される単一の文字を設定します。デフォルト値は、エスケープ文字と引用符文字が異なる場合はエスケープ文字、それ以外の場合は \0 です。 読み書き
samplingRatio 1.0 スキーマ推論に使用される行の割合を定義します。CSV の組み込み関数はこのオプションを無視します。 読み込み
emptyValue 読み込み時:""、書き込み時:"" 空の値の文字列表現を設定します。 読み書き
locale en-US IETF BCP 47 形式のロケールを言語タグとして設定します。たとえば、これは日付とタイムスタンプを解析するときに使用されます。 読み込み
lineSep 読み込み時:\r\r\n\n、書き込み時:\n 解析/書き込みに使用する行区切り文字を定義します。最大長は 1 文字です。CSV の組み込み関数はこのオプションを無視します。 読み書き
unescapedQuoteHandling STOP_AT_DELIMITER エスケープされていない引用符を持つ値の処理方法を CsvParser がどのように処理するかを定義します。
  • STOP_AT_CLOSING_QUOTE: エスケープされていない引用符が入力で見つかった場合、引用符文字を累積し、閉じ引用符が見つかるまで値を引用符付きの値として解析を続行します。
  • BACK_TO_DELIMITER: エスケープされていない引用符が入力で見つかった場合、値を引用符なしの値と見なします。これにより、パーサーは現在の解析値のすべての文字を累積して区切り文字が見つかるまで処理します。値に区切り文字が見つからない場合、パーサーは入力から文字を累積し続け、区切り文字または行末が見つかるまで続行します。
  • STOP_AT_DELIMITER: エスケープされていない引用符が入力で見つかった場合、値を引用符なしの値と見なします。これにより、パーサーは入力で区切り文字または行末が見つかるまで、すべての文字を累積します。
  • SKIP_VALUE: エスケープされていない引用符が入力で見つかった場合、指定された値に対して解析されたコンテンツはスキップされ、nullValue で設定された値が生成されます。
  • RAISE_ERROR: エスケープされていない引用符が入力で見つかった場合、TextParsingException がスローされます。
読み込み
compression
codec
(なし) ファイルへの保存時に使用する圧縮コーデックです。これは、認識されている大文字小文字を区別しない短縮名(nonebzip2gziplz4snappydeflate)のいずれかになります。CSV の組み込み関数はこのオプションを無視します。 書き込み
timeZone (spark.sql.session.timeZone 設定の値) JSON データソースまたはパーティション値でタイムスタンプをフォーマットするために使用されるタイムゾーン ID を示す文字列を設定します。timeZone の以下の形式がサポートされています。
  • 地域ベースのゾーン ID: 「area/city」(例: 「America/Los_Angeles」)の形式である必要があります。
  • ゾーンオフセット:'(+|-)HH:mm' 形式でなければなりません(例:'-08:00' または '+01:00')。'UTC' および 'Z' も '+00:00' のエイリアスとしてサポートされています。
「CST」などのその他の短い名前は、曖昧になる可能性があるため、使用しないことをお勧めします。
読み書き

その他の一般的なオプションについては、Generic File Source Options を参照してください。