Bagaimana cara menerapkan fungsi pada setiap nilai kolom di SPARKR DataFrame?

Saya relatif baru mengenal SPARKR. Saya mengunduh SPARK 1.4 dan mengatur RStudio untuk menggunakan perpustakaan SPARKR. Namun saya ingin tahu bagaimana saya bisa menerapkan fungsi ke setiap nilai di kolom DataFrame terdistribusi, dapatkah seseorang membantu? Misalnya,

Ini bekerja dengan sempurna

myFunc <- function(x) { paste(x , "_hello")}
c <- c("a", "b", "c")
d <- lapply(c, myFunc)

Cara membuat ini berfungsi untuk DataFrame Terdistribusi. Tujuannya adalah untuk menambahkan "_hello" ke setiap nilai kolom Nama DF

DF <- read.df(sqlContext, "TV_Flattened_2.csv", source = "com.databricks.spark.csv", header="true")
SparkR:::lapply(DF$Name, myFunc)

Pada SPARKR versi alpha sebelum rilis SPARK 1.4 sepertinya sudah ada kemampuan ini, mengapa sekarang tidak ada pada rilis resmi SPARK 1.4?


person Sagar    schedule 12.08.2015    source sumber
comment
Saya tidak memiliki pengetahuan tentang sparkr tetapi mungkinkah Anda lebih membutuhkan name(DF) daripada DF$Name ?   -  person mts    schedule 12.08.2015
comment
Di bawah tenda, fungsi lapply masih menjadi bagian dari SparkR 1.4, tetapi untuk saat ini bukan fungsi global, saya tidak tahu alasannya. Anda juga harus melihat fungsi peta.   -  person Wannes Rosiers    schedule 13.08.2015


Jawaban (3)


Menggunakan flatMap, Anda membuat RDD dari DataFrame dengan fungsi yang diterapkan pada semua item.

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

Namun kerugiannya adalah hanya melakukan apa yang Anda harapkan pada kolom pertama DataFrame, ia melewatkan semua kolom lainnya. Hal ini terlihat pada contoh berikut:

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c,u=c(1,2,3)))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

yang memberikan keluaran yang persis sama seperti contoh pertama, namun df dimulai dengan kolom tambahan.

person Wannes Rosiers    schedule 17.08.2015
comment
Tampaknya flatMap tidak diekspor di 2.1, saya harus melakukan: SparkR:::flatMap(...) - person Konrad; 02.02.2017

Saya cukup sering bermain-main dengan ini dan tidak memiliki solusi bersih untuk menerapkan fungsi langsung ke elemen kolom dan sejujurnya saya tidak yakin hal ini memungkinkan saat ini. Meskipun demikian, dengan menggunakan metode COLLECT kita dapat melakukan hal berikut:

Catatan Saya menggunakan Windows dan mengetik ke PowerShell

cd D:\Spark\spark-1.4.1-bin-hadoop2.6
./bin/sparkR
c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
c1 <- collect(df)
myFunc <- function(x) { paste(x , "_hello")}
d <- lapply(c1, myFunc)
df2 <- createDataFrame(sqlContext, as.data.frame(d))
head(df2)

Menghasilkan apa yang akan Anda cetak di R: 1 a _hello 2 b _hello 3 c _hello

Berikut adalah sumber daya yang berguna:

https://spark.apache.org/docs/latest/api/R/index.html

https://spark.apache.org/docs/latest/sparkr.html

https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

person Arthur Aguirre    schedule 17.08.2015
comment
Pendekatan ini bersifat serial. Memanggil pengumpulan pada kerangka data berarti ini tidak akan mendistribusikan pekerjaan di antara para pelaksana. - person Myles Baker; 11.07.2017

Spark 2.x kini memiliki fungsi bernama dapply yang memungkinkan Anda menjalankan fungsi R di setiap partisi kerangka data SparkR.

Contoh kode dari dokumen:

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300

Lihat di sini untuk informasi lebih lanjut: http://spark.apache.org/docs/latest/sparkr.html#run-a-given-function-on-a-large-dataset-using-dapply-or-dapplycollect

Perhatikan saja bahwa jika Anda menggunakan pustaka R eksternal, Anda harus menginstalnya pada node pekerja

person devlace    schedule 21.05.2017