ฉันจะใช้ฟังก์ชันกับแต่ละค่าของคอลัมน์ใน SPARKR DataFrame ได้อย่างไร

ฉันค่อนข้างใหม่กับ SPARKR ฉันดาวน์โหลด SPARK 1.4 และตั้งค่า RStudio เพื่อใช้ไลบรารี SPARKR อย่างไรก็ตาม ฉันต้องการทราบว่าฉันจะใช้ฟังก์ชันกับแต่ละค่าในคอลัมน์ของ DataFrame แบบกระจายได้อย่างไร ใครสามารถช่วยได้บ้าง ตัวอย่างเช่น,

มันทำงานได้อย่างสมบูรณ์แบบ

myFunc <- function(x) { paste(x , "_hello")}
c <- c("a", "b", "c")
d <- lapply(c, myFunc)

วิธีทำให้ใช้งานได้กับ Distributed DataFrame จุดประสงค์คือการผนวก "_hello" ต่อท้ายแต่ละค่าของชื่อคอลัมน์ของ DF

DF <- read.df(sqlContext, "TV_Flattened_2.csv", source = "com.databricks.spark.csv", header="true")
SparkR:::lapply(DF$Name, myFunc)

ในเวอร์ชันอัลฟ่าของ SPARKR ก่อนการเปิดตัว SPARK 1.4 ดูเหมือนว่าจะมีความสามารถนี้ ทำไมตอนนี้จึงขาดหายไปในการเปิดตัวอย่างเป็นทางการของ SPARK 1.4


person Sagar    schedule 12.08.2015    source แหล่งที่มา
comment
ฉันไม่มีความรู้เกี่ยวกับ sparkr แต่คุณอาจต้องการ name(DF) แทน DF$Name ไหม   -  person mts    schedule 12.08.2015
comment
ภายใต้ฝากระโปรงหน้า ฟังก์ชัน lapply ยังคงเป็นส่วนหนึ่งของ SparkR 1.4 แต่ในขณะนี้ ฟังก์ชันดังกล่าวไม่ใช่ฟังก์ชันส่วนกลาง ฉันไม่รู้ว่าทำไม คุณควรดูฟังก์ชันแผนที่ด้วย   -  person Wannes Rosiers    schedule 13.08.2015


คำตอบ (3)


เมื่อใช้ flatMap คุณจะสร้าง RDD จาก DataFrame โดยมีฟังก์ชันใช้กับรายการทั้งหมด

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

อย่างไรก็ตามข้อเสียคือทำเฉพาะสิ่งที่คุณคาดหวังในคอลัมน์แรกของ DataFrame เท่านั้น โดยจะข้ามคอลัมน์อื่นๆ ทั้งหมด สิ่งนี้เห็นได้ในตัวอย่างต่อไปนี้:

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c,u=c(1,2,3)))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

ซึ่งให้ผลลัพธ์เหมือนกับตัวอย่างแรกทุกประการ อย่างไรก็ตาม df เริ่มต้นด้วยคอลัมน์เพิ่มเติม

person Wannes Rosiers    schedule 17.08.2015
comment
ดูเหมือนว่า flatMap จะไม่ถูกส่งออกใน 2.1 ฉันต้องทำ: SparkR:::flatMap(...) - person Konrad; 02.02.2017

ฉันเล่นกับสิ่งนี้มาบ้างแล้วและไม่มีวิธีแก้ปัญหาที่ชัดเจนในการใช้ฟังก์ชันกับองค์ประกอบคอลัมน์โดยตรงและไม่แน่ใจว่าจะเป็นไปได้ในปัจจุบันหรือไม่ อย่างไรก็ตาม การใช้วิธี COLLECT เราสามารถทำได้ดังต่อไปนี้:

หมายเหตุฉันกำลังใช้ Windows และพิมพ์ลงใน PowerShell

cd D:\Spark\spark-1.4.1-bin-hadoop2.6
./bin/sparkR
c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
c1 <- collect(df)
myFunc <- function(x) { paste(x , "_hello")}
d <- lapply(c1, myFunc)
df2 <- createDataFrame(sqlContext, as.data.frame(d))
head(df2)

สร้างสิ่งที่คุณต้องการพิมพ์ใน R: 1 a _hello 2 b _hello 3 c _hello

นี่คือแหล่งข้อมูลที่เป็นประโยชน์:

https://spark.apache.org/docs/latest/api/R/index.html

https://spark.apache.org/docs/latest/sparkr.html

https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

person Arthur Aguirre    schedule 17.08.2015
comment
วิธีการนี้เป็นแบบอนุกรม การเรียกใช้การรวบรวมบนดาต้าเฟรมหมายความว่าสิ่งนี้จะไม่กระจายงานไปยังผู้ดำเนินการ - person Myles Baker; 11.07.2017

ตอนนี้ Spark 2.x มีฟังก์ชันที่เรียกว่า dapply ซึ่งช่วยให้คุณสามารถเรียกใช้ฟังก์ชัน R บนแต่ละพาร์ติชันของ Dataframe SparkR ได้

ตัวอย่างโค้ดจากเอกสาร:

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300

ดูข้อมูลเพิ่มเติมที่นี่: http://spark.apache.org/docs/latest/sparkr.html#run-a-given-function-on-a-large-dataset-using-dapply-or-dapplycollect

โปรดทราบว่าหากคุณใช้ไลบรารี R ภายนอก คุณจะต้องติดตั้งไลบรารีเหล่านี้บนโหนดของผู้ปฏิบัติงาน

person devlace    schedule 21.05.2017