CSVファイル

Spark SQLは、CSV形式のファイルまたはファイルディレクトリをSpark DataFrameに読み込むために spark.read().csv("ファイル名") を提供し、CSVファイルに書き込むために dataframe.write().csv("パス") を提供します。関数 option() を使用して、ヘッダー、区切り文字、文字セットなどの動作を制御するなど、読み取りまたは書き込みの動作をカスタマイズできます。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。

データソースオプション

CSVのデータソースオプションは、以下を使用して設定できます。

プロパティ名デフォルト意味スコープ
sep , 各フィールドと値の区切り文字を設定します。この区切り文字は1つ以上の文字にできます。 読み取り/書き込み
encoding UTF-8 読み取りの場合、指定されたエンコーディングタイプでCSVファイルをデコードします。書き込みの場合、保存されたCSVファイルのエンコーディング(文字セット)を指定します。CSV組み込み関数はこのオプションを無視します。 読み取り/書き込み
quote " 区切り文字が値の一部である可能性がある、引用符付きの値をエスケープするために使用される単一の文字を設定します。読み取りの場合、引用符をオフにするには、null ではなく空の文字列を設定する必要があります。書き込みの場合、空の文字列が設定されている場合は、u0000(ヌル文字)を使用します。 読み取り/書き込み
quoteAll false すべての値を常に引用符で囲む必要があるかどうかを示すフラグ。デフォルトでは、引用符文字を含む値のみをエスケープします。 書き込み
escape \ すでに引用符で囲まれた値の内部で引用符をエスケープするために使用される単一の文字を設定します。 読み取り/書き込み
escapeQuotes true 引用符を含む値を常に引用符で囲む必要があるかどうかを示すフラグ。デフォルトでは、引用符文字を含むすべての値をエスケープします。 書き込み
comment この文字で始まる行をスキップするために使用される単一の文字を設定します。デフォルトでは、無効になっています。 読み取り
header false 読み取りの場合、最初の行を列の名前として使用します。書き込みの場合、最初の行として列の名前を書き込みます。指定されたパスが文字列のRDDである場合、このヘッダーオプションは、存在する場合はヘッダーと同じすべての行を削除することに注意してください。CSV組み込み関数はこのオプションを無視します。 読み取り/書き込み
inferSchema false 入力スキーマをデータから自動的に推論します。データの追加パスが1つ必要になります。CSV組み込み関数はこのオプションを無視します。 読み取り
preferDate true スキーマ推論(inferSchema)中に、dateFormatオプションまたはデフォルトの日付形式を満たす値が含まれている文字列列をDateとして推論しようとします。日付とタイムスタンプが混在している列については、タイムスタンプ形式が指定されていない場合はTimestampTypeとして推論を試み、指定されている場合はStringTypeとして推論を試みます。 読み取り
enforceSchema true trueに設定されている場合、指定または推論されたスキーマがデータソースファイルに強制的に適用され、CSVファイルのヘッダーは無視されます。オプションがfalseに設定されている場合、headerオプションがtrueに設定されている場合に、スキーマがCSVファイルのすべてのヘッダーに対して検証されます。スキーマ内のフィールド名とCSVヘッダーの列名は、spark.sql.caseSensitiveを考慮して、それらの位置によってチェックされます。デフォルト値はtrueですが、誤った結果を避けるためにenforceSchemaオプションを無効にすることをお勧めします。CSV組み込み関数はこのオプションを無視します。 読み取り
ignoreLeadingWhiteSpace false(読み取りの場合)、true(書き込みの場合) 読み取り/書き込みされる値から先頭の空白をスキップするかどうかを示すフラグ。 読み取り/書き込み
ignoreTrailingWhiteSpace false(読み取りの場合)、true(書き込みの場合) 読み取り/書き込みされる値から末尾の空白をスキップするかどうかを示すフラグ。 読み取り/書き込み
nullValue null値の文字列表現を設定します。2.0.1以降、このnullValueパラメーターは、文字列型を含むすべてのサポートされている型に適用されます。 読み取り/書き込み
nanValue NaN 非数値の文字列表現を設定します。 読み取り
positiveInf Inf 正の無限大値の文字列表現を設定します。 読み取り
negativeInf -Inf 負の無限大値の文字列表現を設定します。 読み取り
dateFormat yyyy-MM-dd 日付形式を示す文字列を設定します。カスタム日付形式は、日時パターン の形式に従います。これは日付型に適用されます。 読み取り/書き込み
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] タイムスタンプ形式を示す文字列を設定します。カスタム日付形式は、日時パターン の形式に従います。これはタイムスタンプ型に適用されます。 読み取り/書き込み
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] タイムゾーンなしのタイムスタンプ形式を示す文字列を設定します。カスタム日付形式は、日時パターン の形式に従います。これはタイムゾーンなしのタイムスタンプ型に適用されます。このデータ型の書き込みまたは読み取り時には、ゾーンオフセットとタイムゾーンのコンポーネントはサポートされていないことに注意してください。 読み取り/書き込み
enableDateTimeParsingFallback 時間パーサーポリシーにレガシー設定がある場合、またはカスタムの日付またはタイムスタンプパターンが提供されなかった場合に有効になります。 値が設定されたパターンと一致しない場合に、日付とタイムスタンプの解析の旧バージョン互換(Spark 1.xおよび2.0)の動作へのフォールバックを許可します。 読み取り
maxColumns 20480 レコードが持つことができる列数のハードリミットを定義します。 読み取り
maxCharsPerColumn -1 読み取られる任意の特定の値に対して許可される最大文字数を定義します。デフォルトでは、無制限の長さを示す-1です。 読み取り
mode PERMISSIVE 解析中に破損したレコードを処理するためのモードを許可します。次の大文字と小文字を区別しないモードをサポートします。Sparkは、列プルーニングの下でCSV内の必要な列のみを解析しようとすることに注意してください。したがって、破損したレコードは、必要なフィールドのセットに基づいて異なる場合があります。この動作は、spark.sql.csv.parser.columnPruning.enabled(デフォルトで有効)で制御できます。
  • PERMISSIVE:破損したレコードに遭遇した場合、不正な形式の文字列をcolumnNameOfCorruptRecordで構成されたフィールドに入れ、不正な形式のフィールドをnullに設定します。破損したレコードを保持するために、ユーザーはユーザー定義スキーマでcolumnNameOfCorruptRecordという名前の文字列型フィールドを設定できます。スキーマにフィールドがない場合、解析中に破損したレコードがドロップされます。スキーマよりもトークン数が少ない/多いレコードは、CSVにとって破損したレコードではありません。スキーマの長さよりもトークン数が少ないレコードに遭遇した場合、追加のフィールドにnullを設定します。レコードのトークン数がスキーマの長さよりも多い場合、追加のトークンがドロップされます。
  • DROPMALFORMED:破損したレコード全体を無視します。このモードは、CSV組み込み関数ではサポートされていません。
  • FAILFAST:破損したレコードに遭遇すると例外をスローします。
読み取り
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord構成の値) PERMISSIVEモードで作成された不正な形式の文字列を持つ新しいフィールドの名前変更を許可します。これはspark.sql.columnNameOfCorruptRecordを上書きします。 読み取り
multiLine false ファイルごとに複数の行にまたがる可能性のある1つのレコードを解析します。CSV組み込み関数はこのオプションを無視します。 読み取り
charToEscapeQuoteEscaping escape または \0 引用符文字のエスケープのエスケープに使用される単一の文字を設定します。デフォルト値は、エスケープ文字と引用符文字が異なる場合はエスケープ文字、それ以外の場合は\0です。 読み取り/書き込み
samplingRatio 1.0 スキーマ推論に使用する行の割合を定義します。CSV組み込み関数はこのオプションを無視します。 読み取り
emptyValue (読み込み時)、""(書き込み時) 空の値の文字列表現を設定します。 読み取り/書き込み
locale en-US IETF BCP 47形式の言語タグとしてロケールを設定します。例えば、これは日付やタイムスタンプの解析中に使用されます。 読み取り
lineSep \r\r\n\n(読み込み時)、\n(書き込み時) 解析/書き込みに使用する行区切り文字を定義します。最大長は1文字です。CSV組み込み関数はこのオプションを無視します。 読み取り/書き込み
unescapedQuoteHandling STOP_AT_DELIMITER エスケープされていない引用符を持つ値をCsvParserがどのように処理するかを定義します。
  • STOP_AT_CLOSING_QUOTE: 入力にエスケープされていない引用符が見つかった場合、引用符文字を蓄積し、閉じ引用符が見つかるまで値を引用符付きの値として解析を続行します。
  • BACK_TO_DELIMITER: 入力にエスケープされていない引用符が見つかった場合、値を引用符なしの値として扱います。これにより、パーサーは区切り文字が見つかるまで、現在解析されている値のすべての文字を蓄積します。値に区切り文字が見つからない場合、パーサーは区切り文字または行末が見つかるまで入力から文字の蓄積を続けます。
  • STOP_AT_DELIMITER: 入力にエスケープされていない引用符が見つかった場合、値を引用符なしの値として扱います。これにより、パーサーは入力で区切り文字または行末が見つかるまで、すべての文字を蓄積します。
  • SKIP_VALUE: 入力にエスケープされていない引用符が見つかった場合、特定の値を解析した内容はスキップされ、代わりにnullValueで設定された値が生成されます。
  • RAISE_ERROR: 入力にエスケープされていない引用符が見つかった場合、TextParsingExceptionがスローされます。
読み取り
compression (none) ファイルに保存する際に使用する圧縮コーデック。これは、既知の大文字小文字を区別しない短縮名(nonebzip2gziplz4snappydeflate)のいずれかになります。CSV組み込み関数はこのオプションを無視します。 書き込み

その他の一般的なオプションは、汎用ファイルソースオプションにあります。