งาน SparkR หมดเวลา 100 นาที

ฉันได้เขียนสคริปต์ sparkR ที่ซับซ้อนเล็กน้อยแล้วรันโดยใช้ spark-submit โดยทั่วไปสคริปต์ทำอะไรคืออ่านตารางไม้ปาร์เก้ไฮฟ์/อิมพาลาขนาดใหญ่ทีละแถวและสร้างไฟล์ไม้ปาร์เก้ใหม่ที่มีจำนวนแถวเท่ากัน แต่ดูเหมือนว่างานจะหยุดหลังจากผ่านไปประมาณ 100 นาที ซึ่งดูเหมือนว่าจะหมดเวลาไปบ้าง

  • สำหรับสคริปต์มากถึง 500,000 แถวทำงานได้อย่างสมบูรณ์แบบ (เพราะใช้เวลาน้อยกว่า 100 นาที)
  • สำหรับสคริปต์ 1, 2 หรือ 3 ล้านแถวขึ้นไปจะออกหลังจาก 100 นาที

ฉันตรวจสอบพารามิเตอร์ที่เป็นไปได้ทั้งหมดโดยมีค่าช่วง 100 นาทีที่ฉันรู้และทดสอบแล้ว แต่ก็ไม่สามารถหาทางแก้ไขได้

[user@localhost R]$ time spark-submit sparkr-pre.R                           
Loading required package: methods

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    filter, na.omit

The following objects are masked from ‘package:base’:

    intersect, rbind, sample, subset, summary, table, transform

15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================>                 (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
        at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
        at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed

real    100m30.944s
user    1m26.326s
sys     0m19.459s

ข้อมูลรันไทม์สภาพแวดล้อม

Name    Value
Java Home   /usr/java/jdk1.8.0_40/jre
Java Version    1.8.0_40 (Oracle Corporation)
Scala Version   version 2.10.4
Spark Properties

Name    Value
spark.akka.frameSize    1024
spark.app.id    application_1451466100034_0019
spark.app.name  SparkR
spark.driver.appUIAddress   http://x.x.x.x:4040
spark.driver.host   x.x.x.x
spark.driver.maxResultSize  8G
spark.driver.memory 100G
spark.driver.port   60471
spark.executor.id   driver
spark.executor.memory   14G
spark.executorEnv.LD_LIBRARY_PATH   $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri    http://x.x.x.x:50281
spark.home  /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master    yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS  CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES  http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode    FIFO
spark.serializer    org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString    true
spark.submit.deployMode client
spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives    file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files   file:/home/inuser/R/sparkr-pre.R
System Properties

Name    Value
SPARK_SUBMIT    true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding   UTF-8
file.encoding.pkg   sun.io
file.separator  /
java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version  52.0
java.endorsed.dirs  /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs   /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home   /usr/java/jdk1.8.0_40/jre
java.io.tmpdir  /tmp
java.library.path   :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name   Java(TM) SE Runtime Environment
java.runtime.version    1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor   Oracle Corporation
java.specification.version  1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version    1.8.0_40
java.vm.info    mixed mode
java.vm.name    Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name  Java Virtual Machine Specification
java.vm.specification.vendor    Oracle Corporation
java.vm.specification.version   1.8
java.vm.vendor  Oracle Corporation
java.vm.version 25.40-b25
line.separator  
os.arch amd64
os.name Linux
os.version  2.6.32-431.el6.x86_64
path.separator  :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path   /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian  little
sun.cpu.isalist 
sun.io.unicode.encoding UnicodeLittle
sun.java.command    org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher   SUN_STANDARD
sun.jnu.encoding    UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel 
sun.os.patch.level  unknown
user.country    US
user.dir    /home/user/R
user.home   /home/user
user.language   en
user.name   inuser
user.timezone   Asia/Colombo
Classpath Entries

Resource    Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar   System Classpath

จุดประกาย-default.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master            yarn-client
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.driver.memory             100G
spark.executor.memory           14G 
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize      8G
spark.akka.frameSize            1024
#spark.executor.instances       16

ฉันไม่สามารถแชร์สคริปต์ sparkR ในที่สาธารณะได้ ขออภัยจริงๆ เกี่ยวกับเรื่องนั้น แต่โค้ดทำงานได้อย่างสมบูรณ์แบบเมื่อใช้เวลาน้อยกว่า 100 นาทีจึงจะเสร็จสมบูรณ์


person Shanika    schedule 04.01.2016    source แหล่งที่มา
comment
ดูคุณสมบัติเช่นการหมดเวลาของ spark.network ดูการหมดเวลาที่เป็นไปได้ทั้งหมด: spark.apache .org/docs/latest/configuration.html   -  person Ravindra babu    schedule 04.01.2016
comment
นอกเหนือจากคำแนะนำของ @ravindra คุณกำลังรวบรวม RDD ใด ๆ ให้กับไดรเวอร์หรือเปิดไฟล์ขนาดใหญ่บนไดรเวอร์หรือไม่? ฉันเคยเห็นการปิดระบบ sparkcontext เมื่อแอปพลิเคชันไดรเวอร์หยุดทำงานเท่านั้น ฉันเห็นคุณส่งในโหมดเส้นด้าย-ไคลเอนต์ คุณอาจลองส่งในโหมดเส้นด้าย-คลัสเตอร์เพื่อดูว่าการรันไดรเวอร์บนโหนดผู้ปฏิบัติงานใช้งานได้หรือไม่ การหมดเวลาอาจเป็นการกำหนดค่าที่ผู้ดูแลระบบของคุณตั้งค่าไว้บนเซิร์ฟเวอร์ที่คุณใช้ส่งงาน   -  person Joe Widen    schedule 05.01.2016
comment
ขอบคุณ. ฉันจะลองข้อเสนอแนะของคุณแสดงความคิดเห็นว่าเกิดอะไรขึ้น   -  person Shanika    schedule 05.01.2016
comment
ดูบทความนี้เพื่อการปรับแต่งเพิ่มเติม: blog.cloudera.com/blog/2015/03/   -  person Ravindra babu    schedule 05.01.2016
comment
โหมดคลัสเตอร์ทำให้ฉันมีข้อผิดพลาดเกี่ยวกับการกำหนดค่า ฉันจะสามารถแก้ปัญหาได้ ฉันลองใช้โหมดท้องถิ่นแล้วและมันก็ออกหลังจากผ่านไป 100 นาทีด้วย นอกจากนี้ ฉันอัปเกรด spark เป็น 1.6 ซึ่งเปิดตัวเมื่อไม่กี่วันก่อน แต่ปัญหายังคงเหมือนเดิม ฉันใช้ cloudera 5.4 สำหรับ Hadoop Spark 1.6 ได้รับการติดตั้งและกำหนดค่าด้วยตนเอง   -  person Shanika    schedule 06.01.2016
comment
ข้อมูลบันทึกเพิ่มเติมบางส่วน 16/01/06 09:41:05 INFO ContextCleaner: ทำความสะอาดตัวสะสม 1 ข้อผิดพลาดใน if (returnStatus != 0) { : อาร์กิวเมนต์มีความยาวเป็นศูนย์ โทร: write.df -› write.df -› .local -› callJMethod -› การเรียกใช้ Java หยุด 16/01/06 10:51:40 ข้อมูล SparkContext: เรียกใช้การหยุด () จาก hook ปิดเครื่อง 16/01/06 10:51:41 ข้อมูล SparkUI: หยุด Spark web UI ที่ ip:4040 16/01/06 10:51:41 INFO DAGScheduler: งาน 2 ล้มเหลว: บันทึกที่ NativeMethodAccessorImpl.java:-2 ถ่ายแล้ว 6001.098219 วินาที 16/01/06 10:51:41 ข้อมูล DAGScheduler: ResultStage 3 (บันทึกที่ NativeMethodAccessorImpl.java:-2) ล้มเหลวใน 6000.237 วินาที   -  person Shanika    schedule 06.01.2016
comment
เราพบวิธีแก้ไขปัญหานี้แล้วหรือยัง และจะได้รับหลังจากผ่านไปประมาณ 500 นาทีด้วย   -  person SudhirJ    schedule 19.01.2016
comment
ยัง. ในที่สุดฉันก็ประมวลผลข้อมูลเป็นชุดโดยใช้งาน Spark หลายงาน   -  person Shanika    schedule 20.01.2016


คำตอบ (2)


เป็นข้อบกพร่องที่ทราบแล้วใน Spark 1.6.0 โปรดดู: https://issues.apache.org/jira/browse/SPARK-12609. การตรวจสอบโค้ด SparkR อย่างรวดเร็วยังระบุว่ามีจุดบกพร่องจริงตั้งแต่ Spark 1.4.0

วิธีแก้ปัญหาที่รวดเร็วและสกปรกคือการเพิ่มการหมดเวลาจนกว่าพวกเขาจะปล่อยการแก้ไข ตามที่ระบุไว้ในปัญหา ฟังก์ชันที่เป็นปัญหาคือ connectBackend ฟังก์ชันสามารถแก้ไขได้ที่รันไทม์โดยใช้ assignInNamespace

ข้อมูลต่อไปนี้ดึงข้อมูลฟังก์ชันดั้งเดิม จากนั้นรวมไว้ในฟังก์ชันที่สองซึ่งเราจะเพิ่มค่าการหมดเวลาเป็น 48 ชั่วโมง จากนั้นฟังก์ชันดั้งเดิมจะถูกแทนที่ด้วยกระดาษห่อ

connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR')
connectBackend.patched <- function(hostname, port, timeout = 3600*48) {
   connectBackend.orig(hostname, port, timeout)
}
assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')

ใส่รหัสนี้หลังจากโหลดแพ็คเกจ SparkR

วิธีแก้ไขอื่นคือแก้ไขการหมดเวลาในโค้ด SparkR และคอมไพล์ใหม่ สำหรับคำแนะนำในการคอมไพล์ โปรดดูที่: https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh

person CmdNtrf    schedule 16.02.2016
comment
ขอบคุณมาก. ฉันจะทดสอบและดู - person Shanika; 17.02.2016

ตั้งแต่ Spark 2.1 มีตัวแปรสภาพแวดล้อมชื่อ SPARKR_BACKEND_CONNECTION_TIMEOUT ที่ควบคุมการหมดเวลา อย่างไรก็ตาม ค่าเริ่มต้นยังคงตั้งไว้ที่ 100 นาที ดังนั้นคุณต้องตั้งค่าเช่น SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600 บนไดรเวอร์เพื่อให้คุณสามารถทำงานได้นานขึ้น

ฉันคิดว่าการตั้งค่า --conf spark.yarn.appMasterEnv.SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600 ใน spark-submit จะใช้กลอุบายได้ แต่ดูเหมือนว่าจะตั้งค่าตัวแปรไม่ถูกต้องเช่นกัน ดังนั้นวิธีแก้ปัญหาปัจจุบันของฉันคือรวมสิ่งนี้ไว้ในสคริปต์ R ที่กำลังดำเนินการ:

Sys.setenv("SPARKR_BACKEND_CONNECTION_TIMEOUT" = 1209600)
person GreatEmerald    schedule 27.06.2019