Batas Waktu Pekerjaan SparkR 100 Menit

Saya telah menulis skrip sparkR yang agak rumit dan menjalankannya menggunakan spark-submit. Apa yang pada dasarnya dilakukan skrip adalah membaca tabel berbasis sarang lebah/impala besar baris demi baris dan menghasilkan file parket baru yang memiliki jumlah baris yang sama. Namun tampaknya pekerjaan berhenti tepat setelah sekitar 100 Menit yang tampaknya merupakan batas waktu.

  • Hingga 500 ribu baris, skrip berfungsi dengan sempurna (Karena memerlukan waktu kurang dari 100 Menit)
  • Untuk 1, 2 atau 3 atau lebih juta baris skrip keluar setelah 100 Menit.

Saya memeriksa semua parameter yang mungkin memiliki rentang nilai 100 Menit yang saya ketahui dan uji. Namun tidak dapat menemukan solusi apa pun.

[user@localhost R]$ time spark-submit sparkr-pre.R                           
Loading required package: methods

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    filter, na.omit

The following objects are masked from ‘package:base’:

    intersect, rbind, sample, subset, summary, table, transform

15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================>                 (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
        at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
        at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed

real    100m30.944s
user    1m26.326s
sys     0m19.459s

Informasi Runtime Lingkungan

Name    Value
Java Home   /usr/java/jdk1.8.0_40/jre
Java Version    1.8.0_40 (Oracle Corporation)
Scala Version   version 2.10.4
Spark Properties

Name    Value
spark.akka.frameSize    1024
spark.app.id    application_1451466100034_0019
spark.app.name  SparkR
spark.driver.appUIAddress   http://x.x.x.x:4040
spark.driver.host   x.x.x.x
spark.driver.maxResultSize  8G
spark.driver.memory 100G
spark.driver.port   60471
spark.executor.id   driver
spark.executor.memory   14G
spark.executorEnv.LD_LIBRARY_PATH   $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri    http://x.x.x.x:50281
spark.home  /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master    yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS  CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES  http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode    FIFO
spark.serializer    org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString    true
spark.submit.deployMode client
spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives    file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files   file:/home/inuser/R/sparkr-pre.R
System Properties

Name    Value
SPARK_SUBMIT    true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding   UTF-8
file.encoding.pkg   sun.io
file.separator  /
java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version  52.0
java.endorsed.dirs  /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs   /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home   /usr/java/jdk1.8.0_40/jre
java.io.tmpdir  /tmp
java.library.path   :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name   Java(TM) SE Runtime Environment
java.runtime.version    1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor   Oracle Corporation
java.specification.version  1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version    1.8.0_40
java.vm.info    mixed mode
java.vm.name    Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name  Java Virtual Machine Specification
java.vm.specification.vendor    Oracle Corporation
java.vm.specification.version   1.8
java.vm.vendor  Oracle Corporation
java.vm.version 25.40-b25
line.separator  
os.arch amd64
os.name Linux
os.version  2.6.32-431.el6.x86_64
path.separator  :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path   /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian  little
sun.cpu.isalist 
sun.io.unicode.encoding UnicodeLittle
sun.java.command    org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher   SUN_STANDARD
sun.jnu.encoding    UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel 
sun.os.patch.level  unknown
user.country    US
user.dir    /home/user/R
user.home   /home/user
user.language   en
user.name   inuser
user.timezone   Asia/Colombo
Classpath Entries

Resource    Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar   System Classpath

percikan-default.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master            yarn-client
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.driver.memory             100G
spark.executor.memory           14G 
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize      8G
spark.akka.frameSize            1024
#spark.executor.instances       16

Saya tidak dapat membagikan skrip sparkR di depan umum. Benar-benar menyesal tentang hal itu. Namun kode tersebut berfungsi dengan sempurna jika memerlukan waktu kurang dari 100 Menit untuk menyelesaikannya.


person Shanika    schedule 04.01.2016    source sumber
comment
Lihatlah properti seperti spark.network timeout. Lihat semua kemungkinan batas waktu: spark.apache .org/docs/latest/configuration.html   -  person Ravindra babu    schedule 04.01.2016
comment
Selain saran @ravindra, apakah Anda mengumpulkan RDD ke driver, atau membuka file besar di driver? Saya hanya melihat sparkcontext dimatikan saat aplikasi driver mati. Saya melihat Anda mengirimkan dalam mode klien benang, Anda mungkin dapat mencoba mengirimkan dalam mode cluster benang untuk melihat apakah menjalankan driver pada node pekerja berfungsi. Batas waktu mungkin merupakan konfigurasi yang ditetapkan admin Anda di server tempat Anda mengirimkan tugas.   -  person Joe Widen    schedule 05.01.2016
comment
Terima kasih. Saya akan mencoba saran Anda, mengomentari apa yang terjadi.   -  person Shanika    schedule 05.01.2016
comment
Lihat artikel ini untuk penyesuaian lebih lanjut: blog.cloudera.com/blog/2015/03/   -  person Ravindra babu    schedule 05.01.2016
comment
Mode cluster memberi saya kesalahan terkait konfigurasi. Saya akan bisa menyelesaikannya. Saya mencoba mode lokal dan juga keluar setelah 100 Menit. Selanjutnya saya mengupgrade spark ke 1.6 yang dirilis beberapa hari lalu. Namun masalahnya tetap sama. Saya menggunakan cloudera 5.4 untuk hadoop. Spark 1.6 diinstal dan dikonfigurasi secara manual.   -  person Shanika    schedule 06.01.2016
comment
Beberapa info log lainnya 16/01/06 09:41:05 INFO ContextCleaner: Akumulator dibersihkan 1 Kesalahan dalam if (returnStatus != 0) { : argumen panjangnya nol Panggilan: write.df -› write.df -› .local -› callJMethod -› Eksekusi invokeJava dihentikan 01/16/06 10:51:40 INFO SparkContext: Memanggil stop() dari kait pematian 01/16/06 10:51:41 INFO SparkUI: Menghentikan UI web Spark di ip:4040 16/01/06 10:51:41 INFO DAGScheduler: Pekerjaan 2 gagal: simpan di NativeMethodAccessorImpl.java:-2, ambil 6001.098219 s 16/01/06 10:51:41 INFO DAGScheduler: ResultStage 3 (simpan di NativeMethodAccessorImpl.java:-2) gagal dalam 6000.237 detik   -  person Shanika    schedule 06.01.2016
comment
Apakah kami menemukan solusi untuk ini, kami juga mendapatkannya setelah sekitar 500 menit.   -  person SudhirJ    schedule 19.01.2016
comment
Belum. Akhirnya saya mengolah data batch demi batch dengan menggunakan beberapa spark job.   -  person Shanika    schedule 20.01.2016


Jawaban (2)


Ini adalah bug yang diketahui di Spark 1.6.0, lihat: https://issues.apache.org/jira/browse/SPARK-12609. Tinjauan singkat terhadap kode SparkR juga menunjukkan bahwa bug tersebut sebenarnya ada sejak Spark 1.4.0.

Sampai mereka merilis perbaikan, solusi cepat dan kotor adalah menambah waktu tunggu. Seperti disebutkan dalam terbitan ini, fungsi yang bermasalah adalah connectBackend Fungsi tersebut dapat ditambal saat runtime menggunakan assignInNamespace.

Berikut ini mengambil fungsi asli, lalu membungkusnya dalam fungsi kedua yang nilai batas waktunya kita tingkatkan menjadi 48 jam. Fungsi aslinya kemudian digantikan oleh wrapper.

connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR')
connectBackend.patched <- function(hostname, port, timeout = 3600*48) {
   connectBackend.orig(hostname, port, timeout)
}
assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')

Masukkan kode ini setelah memuat paket SparkR.

Solusi lainnya adalah mengubah batas waktu dalam kode SparkR dan mengkompilasi ulang. Untuk petunjuk kompilasi, lihat: https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh

person CmdNtrf    schedule 16.02.2016
comment
Terima kasih banyak. Saya akan menguji dan melihat. - person Shanika; 17.02.2016

Sejak Spark 2.1, ada variabel lingkungan bernama SPARKR_BACKEND_CONNECTION_TIMEOUT yang mengontrol batas waktu. Namun, defaultnya masih disetel ke 100 menit. Jadi, Anda perlu mengatur mis. SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600 pada driver sehingga Anda dapat menjalankan tugas lebih lama.

Saya mengira bahwa menyetel --conf spark.yarn.appMasterEnv.SPARKR_BACKEND_CONNECTION_TIMEOUT=1209600 di spark-submit akan berhasil, tetapi tampaknya hal itu juga tidak menyetel variabel dengan benar. Jadi solusi saya saat ini adalah memasukkan ini ke dalam skrip R yang sedang dijalankan:

Sys.setenv("SPARKR_BACKEND_CONNECTION_TIMEOUT" = 1209600)
person GreatEmerald    schedule 27.06.2019