基本統計 - RDDベースAPI

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

要約統計量

Statistics で利用可能な colStats 関数を通じて、RDD[Vector] の列ごとの要約統計量を提供します。

colStats()MultivariateStatisticalSummary のインスタンスを返します。このインスタンスには、列ごとの最大値、最小値、平均値、分散、非ゼロ要素数、および合計カウントが含まれます。

APIの詳細については、MultivariateStatisticalSummary Python ドキュメントを参照してください。

import numpy as np

from pyspark.mllib.stat import Statistics

mat = sc.parallelize(
    [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])]
)  # an RDD of Vectors

# Compute column summary statistics.
summary = Statistics.colStats(mat)
print(summary.mean())  # a dense vector containing the mean value for each column
print(summary.variance())  # column-wise variance
print(summary.numNonzeros())  # number of nonzeros in each column
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/summary_statistics_example.py」にあります。

colStats()MultivariateStatisticalSummary のインスタンスを返します。このインスタンスには、列ごとの最大値、最小値、平均値、分散、非ゼロ要素数、および合計カウントが含まれます。

APIの詳細については、MultivariateStatisticalSummary Scala ドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

val observations = sc.parallelize(
  Seq(
    Vectors.dense(1.0, 10.0, 100.0),
    Vectors.dense(2.0, 20.0, 200.0),
    Vectors.dense(3.0, 30.0, 300.0)
  )
)

// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
println(summary.mean)  // a dense vector containing the mean value for each column
println(summary.variance)  // column-wise variance
println(summary.numNonzeros)  // number of nonzeros in each column
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/SummaryStatisticsExample.scala」にあります。

colStats()MultivariateStatisticalSummary のインスタンスを返します。このインスタンスには、列ごとの最大値、最小値、平均値、分散、非ゼロ要素数、および合計カウントが含まれます。

APIの詳細については、MultivariateStatisticalSummary Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;
import org.apache.spark.mllib.stat.Statistics;

JavaRDD<Vector> mat = jsc.parallelize(
  Arrays.asList(
    Vectors.dense(1.0, 10.0, 100.0),
    Vectors.dense(2.0, 20.0, 200.0),
    Vectors.dense(3.0, 30.0, 300.0)
  )
); // an RDD of Vectors

// Compute column summary statistics.
MultivariateStatisticalSummary summary = Statistics.colStats(mat.rdd());
System.out.println(summary.mean());  // a dense vector containing the mean value for each column
System.out.println(summary.variance());  // column-wise variance
System.out.println(summary.numNonzeros());  // number of nonzeros in each column
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaSummaryStatisticsExample.java」にあります。

相関

統計学において、2つのデータ系列間の相関を計算することは一般的な操作です。spark.mllib では、多数の系列間のペアワイズ相関を計算する柔軟性を提供しています。現在サポートされている相関方法は、ピアソン相関とスピアマン相関です。

Statistics は、系列間の相関を計算するためのメソッドを提供します。入力タイプ(2つの RDD[Double] または RDD[Vector])に応じて、出力はそれぞれ Double または相関 Matrix になります。

APIの詳細については、Statistics Python ドキュメントを参照してください。

from pyspark.mllib.stat import Statistics

seriesX = sc.parallelize([1.0, 2.0, 3.0, 3.0, 5.0])  # a series
# seriesY must have the same number of partitions and cardinality as seriesX
seriesY = sc.parallelize([11.0, 22.0, 33.0, 33.0, 555.0])

# Compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print("Correlation is: " + str(Statistics.corr(seriesX, seriesY, method="pearson")))

data = sc.parallelize(
    [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([5.0, 33.0, 366.0])]
)  # an RDD of Vectors

# calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
# If a method is not specified, Pearson's method will be used by default.
print(Statistics.corr(data, method="pearson"))
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/correlations_example.py」にあります。

Statistics は、系列間の相関を計算するためのメソッドを提供します。入力タイプ(2つの RDD[Double] または RDD[Vector])に応じて、出力はそれぞれ Double または相関 Matrix になります。

APIの詳細については、Statistics Scala ドキュメントを参照してください。

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD

val seriesX: RDD[Double] = sc.parallelize(
  immutable.ArraySeq.unsafeWrapArray(Array(1.0, 2.0, 3.0, 3.0, 5.0)))  // a series
// must have the same number of partitions and cardinality as seriesX
val seriesY: RDD[Double] = sc.parallelize(
  immutable.ArraySeq.unsafeWrapArray(Array(11.0, 22.0, 33.0, 33.0, 555.0)))

// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a
// method is not specified, Pearson's method will be used by default.
val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson")
println(s"Correlation is: $correlation")

val data: RDD[Vector] = sc.parallelize(
  Seq(
    Vectors.dense(1.0, 10.0, 100.0),
    Vectors.dense(2.0, 20.0, 200.0),
    Vectors.dense(5.0, 33.0, 366.0))
)  // note that each Vector is a row and not a column

// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method
// If a method is not specified, Pearson's method will be used by default.
val correlMatrix: Matrix = Statistics.corr(data, "pearson")
println(correlMatrix.toString)
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/CorrelationsExample.scala」にあります。

Statistics は、系列間の相関を計算するためのメソッドを提供します。入力タイプ(2つの JavaDoubleRDD または JavaRDD<Vector>)に応じて、出力はそれぞれ Double または相関 Matrix になります。

APIの詳細については、Statistics Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.stat.Statistics;

JavaDoubleRDD seriesX = jsc.parallelizeDoubles(
  Arrays.asList(1.0, 2.0, 3.0, 3.0, 5.0));  // a series

// must have the same number of partitions and cardinality as seriesX
JavaDoubleRDD seriesY = jsc.parallelizeDoubles(
  Arrays.asList(11.0, 22.0, 33.0, 33.0, 555.0));

// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default.
double correlation = Statistics.corr(seriesX.srdd(), seriesY.srdd(), "pearson");
System.out.println("Correlation is: " + correlation);

// note that each Vector is a row and not a column
JavaRDD<Vector> data = jsc.parallelize(
  Arrays.asList(
    Vectors.dense(1.0, 10.0, 100.0),
    Vectors.dense(2.0, 20.0, 200.0),
    Vectors.dense(5.0, 33.0, 366.0)
  )
);

// calculate the correlation matrix using Pearson's method.
// Use "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default.
Matrix correlMatrix = Statistics.corr(data.rdd(), "pearson");
System.out.println(correlMatrix.toString());
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaCorrelationsExample.java」にあります。

層化サンプリング

spark.mllib にある他の統計関数とは異なり、層化サンプリングメソッドである sampleByKey および sampleByKeyExact は、キーと値のペアの RDD に対して実行できます。層化サンプリングでは、キーをラベル、値を特定の属性と見なすことができます。たとえば、キーは男性または女性、あるいはドキュメント ID であり、対応する値は母集団の年齢のリスト、またはドキュメントの単語のリストです。sampleByKey メソッドは、観測値がサンプリングされるかどうかを決定するためにコインを投げます。そのため、データに対する1回のパスが必要で、*期待される*サンプルサイズを提供します。sampleByKeyExact は、sampleByKey で使用される層ごとの単純無作為抽出よりも大幅に多くのリソースを必要としますが、99.99% の信頼度で正確なサンプリングサイズを提供します。sampleByKeyExact は現在 Python ではサポートされていません。

sampleByKey() は、ユーザーが近似的に $\lceil f_k \cdot n_k \rceil \, \forall k \in K$ 個のアイテムをサンプリングできるようにします。ここで、$f_k$ はキー $k$ の希望する割合、$n_k$ はキー $k$ のキーと値のペアの数、$K$ はキーのセットです。

注意: sampleByKeyExact() は現在 Python ではサポートされていません。

# an RDD of any key value pairs
data = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')])

# specify the exact fraction desired from each key as a dictionary
fractions = {1: 0.1, 2: 0.6, 3: 0.3}

approxSample = data.sampleByKey(False, fractions)
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/stratified_sampling_example.py」にあります。

sampleByKeyExact() は、ユーザーが正確に $\lceil f_k \cdot n_k \rceil \, \forall k \in K$ 個のアイテムをサンプリングできるようにします。ここで、$f_k$ はキー $k$ の希望する割合、$n_k$ はキー $k$ のキーと値のペアの数、$K$ はキーのセットです。復元なしサンプリングは、サンプルサイズを保証するために RDD の追加の1回のパスを必要としますが、復元ありサンプリングは追加の2回のパスを必要とします。

// an RDD[(K, V)] of any key value pairs
val data = sc.parallelize(
  Seq((1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')))

// specify the exact fraction desired from each key
val fractions = Map(1 -> 0.1, 2 -> 0.6, 3 -> 0.3)

// Get an approximate sample from each stratum
val approxSample = data.sampleByKey(withReplacement = false, fractions = fractions)
// Get an exact sample from each stratum
val exactSample = data.sampleByKeyExact(withReplacement = false, fractions = fractions)
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/StratifiedSamplingExample.scala」にあります。

sampleByKeyExact() は、ユーザーが正確に $\lceil f_k \cdot n_k \rceil \, \forall k \in K$ 個のアイテムをサンプリングできるようにします。ここで、$f_k$ はキー $k$ の希望する割合、$n_k$ はキー $k$ のキーと値のペアの数、$K$ はキーのセットです。復元なしサンプリングは、サンプルサイズを保証するために RDD の追加の1回のパスを必要としますが、復元ありサンプリングは追加の2回のパスを必要とします。

import java.util.*;

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;

List<Tuple2<Integer, Character>> list = Arrays.asList(
    new Tuple2<>(1, 'a'),
    new Tuple2<>(1, 'b'),
    new Tuple2<>(2, 'c'),
    new Tuple2<>(2, 'd'),
    new Tuple2<>(2, 'e'),
    new Tuple2<>(3, 'f')
);

JavaPairRDD<Integer, Character> data = jsc.parallelizePairs(list);

// specify the exact fraction desired from each key Map<K, Double>
ImmutableMap<Integer, Double> fractions = ImmutableMap.of(1, 0.1, 2, 0.6, 3, 0.3);

// Get an approximate sample from each stratum
JavaPairRDD<Integer, Character> approxSample = data.sampleByKey(false, fractions);
// Get an exact sample from each stratum
JavaPairRDD<Integer, Character> exactSample = data.sampleByKeyExact(false, fractions);
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaStratifiedSamplingExample.java」にあります。

仮説検定

仮説検定は、統計学において、ある結果が偶然に発生したのか、それとも統計的に有意なのかを判断するための強力なツールです。spark.mllib は現在、適合度検定と独立性検定のためのピアソンのカイ二乗 ($\chi^2$) 検定をサポートしています。入力データの型によって、適合度検定または独立性検定のどちらが実行されるかが決まります。適合度検定は Vector 型の入力を必要とし、独立性検定は Matrix 型の入力を必要とします。

spark.mllib は、カイ二乗独立性検定による特徴選択を可能にするために、RDD[LabeledPoint] 型の入力もサポートしています。

Statistics は、ピアソンのカイ二乗検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

APIの詳細については、Statistics Python ドキュメントを参照してください。

from pyspark.mllib.linalg import Matrices, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics

vec = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25)  # a vector composed of the frequencies of events

# compute the goodness of fit. If a second vector to test against
# is not supplied as a parameter, the test runs against a uniform distribution.
goodnessOfFitTestResult = Statistics.chiSqTest(vec)

# summary of the test including the p-value, degrees of freedom,
# test statistic, the method used, and the null hypothesis.
print("%s\n" % goodnessOfFitTestResult)

mat = Matrices.dense(3, 2, [1.0, 3.0, 5.0, 2.0, 4.0, 6.0])  # a contingency matrix

# conduct Pearson's independence test on the input contingency matrix
independenceTestResult = Statistics.chiSqTest(mat)

# summary of the test including the p-value, degrees of freedom,
# test statistic, the method used, and the null hypothesis.
print("%s\n" % independenceTestResult)

obs = sc.parallelize(
    [LabeledPoint(1.0, [1.0, 0.0, 3.0]),
     LabeledPoint(1.0, [1.0, 2.0, 0.0]),
     LabeledPoint(1.0, [-1.0, 0.0, -0.5])]
)  # LabeledPoint(label, feature)

# The contingency table is constructed from an RDD of LabeledPoint and used to conduct
# the independence test. Returns an array containing the ChiSquaredTestResult for every feature
# against the label.
featureTestResults = Statistics.chiSqTest(obs)

for i, result in enumerate(featureTestResults):
    print("Column %d:\n%s" % (i + 1, result))
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/hypothesis_testing_example.py」にあります。

Statistics は、ピアソンのカイ二乗検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.rdd.RDD

// a vector composed of the frequencies of events
val vec: Vector = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25)

// compute the goodness of fit. If a second vector to test against is not supplied
// as a parameter, the test runs against a uniform distribution.
val goodnessOfFitTestResult = Statistics.chiSqTest(vec)
// summary of the test including the p-value, degrees of freedom, test statistic, the method
// used, and the null hypothesis.
println(s"$goodnessOfFitTestResult\n")

// a contingency matrix. Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val mat: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// conduct Pearson's independence test on the input contingency matrix
val independenceTestResult = Statistics.chiSqTest(mat)
// summary of the test including the p-value, degrees of freedom
println(s"$independenceTestResult\n")

val obs: RDD[LabeledPoint] =
  sc.parallelize(
    Seq(
      LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
      LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 0.0)),
      LabeledPoint(-1.0, Vectors.dense(-1.0, 0.0, -0.5)
      )
    )
  ) // (label, feature) pairs.

// The contingency table is constructed from the raw (label, feature) pairs and used to conduct
// the independence test. Returns an array containing the ChiSquaredTestResult for every feature
// against the label.
val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs)
featureTestResults.zipWithIndex.foreach { case (k, v) =>
  println(s"Column ${(v + 1)} :")
  println(k)
}  // summary of the test
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/HypothesisTestingExample.scala」にあります。

Statistics は、ピアソンのカイ二乗検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

APIの詳細については、ChiSqTestResult Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Matrices;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.stat.Statistics;
import org.apache.spark.mllib.stat.test.ChiSqTestResult;

// a vector composed of the frequencies of events
Vector vec = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25);

// compute the goodness of fit. If a second vector to test against is not supplied
// as a parameter, the test runs against a uniform distribution.
ChiSqTestResult goodnessOfFitTestResult = Statistics.chiSqTest(vec);
// summary of the test including the p-value, degrees of freedom, test statistic,
// the method used, and the null hypothesis.
System.out.println(goodnessOfFitTestResult + "\n");

// Create a contingency matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix mat = Matrices.dense(3, 2, new double[]{1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// conduct Pearson's independence test on the input contingency matrix
ChiSqTestResult independenceTestResult = Statistics.chiSqTest(mat);
// summary of the test including the p-value, degrees of freedom...
System.out.println(independenceTestResult + "\n");

// an RDD of labeled points
JavaRDD<LabeledPoint> obs = jsc.parallelize(
  Arrays.asList(
    new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
    new LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 0.0)),
    new LabeledPoint(-1.0, Vectors.dense(-1.0, 0.0, -0.5))
  )
);

// The contingency table is constructed from the raw (label, feature) pairs and used to conduct
// the independence test. Returns an array containing the ChiSquaredTestResult for every feature
// against the label.
ChiSqTestResult[] featureTestResults = Statistics.chiSqTest(obs.rdd());
int i = 1;
for (ChiSqTestResult result : featureTestResults) {
  System.out.println("Column " + i + ":");
  System.out.println(result + "\n");  // summary of the test
  i++;
}
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaHypothesisTestingExample.java」にあります。

さらに、spark.mllib は、確率分布の等価性に対するコルモゴロフ・スミルノフ (KS) 検定の1標本、両側実装を提供します。理論分布の名前(現在、正規分布のみサポート)とそのパラメータ、または指定された理論分布に従って累積分布を計算する関数を提供することで、ユーザーはサンプルがその分布から抽出されたという帰無仮説を検定できます。ユーザーが正規分布(distName="norm")に対して検定を行いますが、分布パラメータを提供しない場合、検定は標準正規分布に初期化され、適切なメッセージがログに記録されます。

Statistics は、1標本、両側のコルモゴロフ・スミルノフ検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

APIの詳細については、Statistics Python ドキュメントを参照してください。

from pyspark.mllib.stat import Statistics

parallelData = sc.parallelize([0.1, 0.15, 0.2, 0.3, 0.25])

# run a KS test for the sample versus a standard normal distribution
testResult = Statistics.kolmogorovSmirnovTest(parallelData, "norm", 0, 1)
# summary of the test including the p-value, test statistic, and null hypothesis
# if our p-value indicates significance, we can reject the null hypothesis
# Note that the Scala functionality of calling Statistics.kolmogorovSmirnovTest with
# a lambda to calculate the CDF is not made available in the Python API
print(testResult)
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/hypothesis_testing_kolmogorov_smirnov_test_example.py」にあります。

Statistics は、1標本、両側のコルモゴロフ・スミルノフ検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

APIの詳細については、Statistics Scala ドキュメントを参照してください。

import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD

val data: RDD[Double] = sc.parallelize(Seq(0.1, 0.15, 0.2, 0.3, 0.25))  // an RDD of sample data

// run a KS test for the sample versus a standard normal distribution
val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)
// summary of the test including the p-value, test statistic, and null hypothesis if our p-value
// indicates significance, we can reject the null hypothesis.
println(testResult)
println()

// perform a KS test using a cumulative distribution function of our making
val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1)
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
println(testResult2)
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/HypothesisTestingKolmogorovSmirnovTestExample.scala」にあります。

Statistics は、1標本、両側のコルモゴロフ・スミルノフ検定を実行するためのメソッドを提供します。次の例は、仮説検定の実行方法と解釈方法を示しています。

APIの詳細については、Statistics Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.mllib.stat.Statistics;
import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult;

JavaDoubleRDD data = jsc.parallelizeDoubles(Arrays.asList(0.1, 0.15, 0.2, 0.3, 0.25));
KolmogorovSmirnovTestResult testResult =
  Statistics.kolmogorovSmirnovTest(data, "norm", 0.0, 1.0);
// summary of the test including the p-value, test statistic, and null hypothesis
// if our p-value indicates significance, we can reject the null hypothesis
System.out.println(testResult);
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaHypothesisTestingKolmogorovSmirnovTestExample.java」にあります。

ストリーミング有意性検定

spark.mllib は、A/B テストなどのユースケースをサポートするために、一部の検定のオンライン実装を提供します。これらの検定は、Spark Streaming の DStream[(Boolean, Double)] に対して実行できます。このタプルの最初の要素は、制御グループ(false)または治療グループ(true)を示し、2番目の要素は観測値の値です。

ストリーミング有意性検定は、以下のパラメータをサポートしています。

StreamingTest は、ストリーミング仮説検定を提供します。

val data = ssc.textFileStream(dataDir).map(line => line.split(",") match {
  case Array(label, value) => BinarySample(label.toBoolean, value.toDouble)
})

val streamingTest = new StreamingTest()
  .setPeacePeriod(0)
  .setWindowSize(0)
  .setTestMethod("welch")

val out = streamingTest.registerStream(data)
out.print()
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala」にあります。

StreamingTest は、ストリーミング仮説検定を提供します。

import org.apache.spark.mllib.stat.test.BinarySample;
import org.apache.spark.mllib.stat.test.StreamingTest;
import org.apache.spark.mllib.stat.test.StreamingTestResult;

JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
  String[] ts = line.split(",");
  boolean label = Boolean.parseBoolean(ts[0]);
  double value = Double.parseDouble(ts[1]);
  return new BinarySample(label, value);
});

StreamingTest streamingTest = new StreamingTest()
  .setPeacePeriod(0)
  .setWindowSize(0)
  .setTestMethod("welch");

JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
out.print();
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java」にあります。

乱数生成

乱数生成は、ランダムアルゴリズム、プロトタイピング、およびパフォーマンステストに役立ちます。spark.mllib は、一様分布、標準正規分布、またはポアソン分布から描画された i.i.d. 値を持つランダム RDD の生成をサポートしています。

RandomRDDs は、ランダムなダブル RDD またはベクトル RDD を生成するためのファクトリメソッドを提供します。次の例は、標準正規分布 N(0, 1) に従う値を持つランダムなダブル RDD を生成し、それを N(1, 4) にマッピングする方法を示しています。

APIの詳細については、RandomRDDs Python ドキュメントを参照してください。

from pyspark.mllib.random import RandomRDDs

sc = ... # SparkContext

# Generate a random double RDD that contains 1 million i.i.d. values drawn from the
# standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
u = RandomRDDs.normalRDD(sc, 1000000L, 10)
# Apply a transform to get a random double RDD following `N(1, 4)`.
v = u.map(lambda x: 1.0 + 2.0 * x)

RandomRDDs は、ランダムなダブル RDD またはベクトル RDD を生成するためのファクトリメソッドを提供します。次の例は、標準正規分布 N(0, 1) に従う値を持つランダムなダブル RDD を生成し、それを N(1, 4) にマッピングする方法を示しています。

APIの詳細については、RandomRDDs Scala ドキュメントを参照してください。

import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._

val sc: SparkContext = ...

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)
// Apply a transform to get a random double RDD following `N(1, 4)`.
val v = u.map(x => 1.0 + 2.0 * x)

RandomRDDs は、ランダムなダブル RDD またはベクトル RDD を生成するためのファクトリメソッドを提供します。次の例は、標準正規分布 N(0, 1) に従う値を持つランダムなダブル RDD を生成し、それを N(1, 4) にマッピングする方法を示しています。

APIの詳細については、RandomRDDs Java ドキュメントを参照してください。

import org.apache.spark.SparkContext;
import org.apache.spark.api.JavaDoubleRDD;
import static org.apache.spark.mllib.random.RandomRDDs.*;

JavaSparkContext jsc = ...

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
// Apply a transform to get a random double RDD following `N(1, 4)`.
JavaDoubleRDD v = u.mapToDouble(x -> 1.0 + 2.0 * x);

カーネル密度推定

カーネル密度推定は、観測されたサンプルがどの特定の分布から抽出されたかという仮定を必要とせずに、経験的確率分布を視覚化するのに役立つ技術です。これは、ランダム変数の確率密度関数の推定値を、指定された点の集合で評価したものを計算します。この推定は、各サンプルを中心に配置された正規分布の PDF の平均として、特定の点における経験的分布の PDF を表現することによって達成されます。

KernelDensity は、サンプルの RDD からカーネル密度推定値を計算するためのメソッドを提供します。次の例は、その方法を示しています。

APIの詳細については、KernelDensity Python ドキュメントを参照してください。

from pyspark.mllib.stat import KernelDensity

# an RDD of sample data
data = sc.parallelize([1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0, 8.0, 9.0, 9.0])

# Construct the density estimator with the sample data and a standard deviation for the Gaussian
# kernels
kd = KernelDensity()
kd.setSample(data)
kd.setBandwidth(3.0)

# Find density estimates for the given values
densities = kd.estimate([-1.0, 2.0, 5.0])
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/mllib/kernel_density_estimation_example.py」にあります。

KernelDensity は、サンプルの RDD からカーネル密度推定値を計算するためのメソッドを提供します。次の例は、その方法を示しています。

APIの詳細については、KernelDensity Scala ドキュメントを参照してください。

import org.apache.spark.mllib.stat.KernelDensity
import org.apache.spark.rdd.RDD

// an RDD of sample data
val data: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 9))

// Construct the density estimator with the sample data and a standard deviation
// for the Gaussian kernels
val kd = new KernelDensity()
  .setSample(data)
  .setBandwidth(3.0)

// Find density estimates for the given values
val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/KernelDensityEstimationExample.scala」にあります。

KernelDensity は、サンプルの RDD からカーネル密度推定値を計算するためのメソッドを提供します。次の例は、その方法を示しています。

APIの詳細については、KernelDensity Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.stat.KernelDensity;

// an RDD of sample data
JavaRDD<Double> data = jsc.parallelize(
  Arrays.asList(1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0, 8.0, 9.0, 9.0));

// Construct the density estimator with the sample data
// and a standard deviation for the Gaussian kernels
KernelDensity kd = new KernelDensity().setSample(data).setBandwidth(3.0);

// Find density estimates for the given values
double[] densities = kd.estimate(new double[]{-1.0, 2.0, 5.0});

System.out.println(Arrays.toString(densities));
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaKernelDensityEstimationExample.java」にあります。