GC overhead limit exceeded mit großen RDD[MatrixEntry] in Apache Spark

Ich habe eine csv-Datei gespeichert Daten des user-Element der dimension 6,365x214 , und ich finde user-user ähnlichkeit mithilfe columnSimilarities() von org.apache.spark.mllib.linalg.distributed.CoordinateMatrix.

Mein code sieht wie folgt aus:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, 
MatrixEntry, CoordinateMatrix}
import org.apache.spark.rdd.RDD

def rddToCoordinateMatrix(input_rdd: RDD[String]) : CoordinateMatrix = {

    //Convert RDD[String] to RDD[Tuple3]
    val coo_matrix_input: RDD[Tuple3[Long,Long,Double]] = input_rdd.map(
        line => line.split(',').toList
    ).map{
            e => (e(0).toLong, e(1).toLong, e(2).toDouble)
    }

    //Convert RDD[Tuple3] to RDD[MatrixEntry]
    val coo_matrix_matrixEntry: RDD[MatrixEntry] = coo_matrix_input.map(e => MatrixEntry(e._1, e._2, e._3))

    //Convert RDD[MatrixEntry] to CoordinateMatrix
    val coo_matrix: CoordinateMatrix  = new CoordinateMatrix(coo_matrix_matrixEntry)

    return coo_matrix
}

//Read CSV File to RDD[String]
val input_rdd: RDD[String] = sc.textFile("user_item.csv")

//Read RDD[String] to CoordinateMatrix
val coo_matrix = rddToCoordinateMatrix(input_rdd)

//Transpose CoordinateMatrix
val coo_matrix_trans = coo_matrix.transpose()

//Convert CoordinateMatrix to RowMatrix
val mat: RowMatrix = coo_matrix_trans.toRowMatrix()

//Compute similar columns perfectly, with brute force
//Return CoordinateMatrix
val simsPerfect: CoordinateMatrix = mat.columnSimilarities()

//CoordinateMatrix to RDD[MatrixEntry]
val simsPerfect_entries = simsPerfect.entries

simsPerfect_entries.count()

//Write results to file
val results_rdd = simsPerfect_entries.map(line => line.i+","+line.j+","+line.value)

results_rdd.saveAsTextFile("similarity-output")

//Close the REPL terminal
System.exit(0)

und, wenn ich dieses Skript auf Funke-shell
ich habe folgenden Fehler nach der Ausführung von Zeile code simsPerfect_entries.count() :

java.lang.OutOfMemoryError: GC overhead limit exceeded

Aktualisiert:

Ich habe versucht, viele Lösungen, die bereits von anderen gegeben ,aber ich habe keinen Erfolg.

1 Durch Erhöhung der Menge an Speicher zu verwenden, pro Testamentsvollstrecker Prozess spark.executor.memory=1g

2 Durch eine Verringerung der Anzahl der Kerne zu verwenden, für die der Treiber-Prozess
spark.driver.cores=1

Mir empfehlen einige Weg, um dieses Problem zu beheben.

  • "Ich habe versucht, viele Lösungen, die bereits von anderen gegeben ,aber ich habe keinen Erfolg." man soll-Liste, die diejenigen so können wir vermeiden, dass redundante Antworten.
  • Akshay scheint, wie ich bin vor dem gleichen problem Hier ist questionhttp://stackoverflow.com/q/37958522/1662775. Ich habe versucht zunehmenden Treiber-Speicher, aber kein Glück.
Schreibe einen Kommentar