Apache Spark (v1.6.1) เริ่มต้นเป็นบริการบนเครื่อง Ubuntu (10.10.0.102) โดยใช้ ./start-all.sh
ตอนนี้จำเป็นต้องส่งงานไปยังเซิร์ฟเวอร์นี้จากระยะไกลโดยใช้ Java API
ต่อไปนี้เป็นโค้ดไคลเอ็นต์ Java ที่ทำงานจากเครื่องอื่น (10.10.0.95)
String mySqlConnectionUrl = "jdbc:mysql://localhost:3306/demo?user=sec&password=sec";
String jars[] = new String[] {"/home/.m2/repository/com/databricks/spark-csv_2.10/1.4.0/spark-csv_2.10-1.4.0.jar",
"/home/.m2/repository/org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar",
"/home/.m2/repository/mysql/mysql-connector-java/6.0.2/mysql-connector-java-6.0.2.jar"};
SparkConf sparkConf = new SparkConf()
.setAppName("sparkCSVWriter")
.setMaster("spark://10.10.0.102:7077")
.setJars(jars);
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext);
Map<String, String> options = new HashMap<>();
options.put("driver", "com.mysql.jdbc.Driver");
options.put("url", mySqlConnectionUrl);
options.put("dbtable", "(select p.FIRST_NAME from person p) as firstName");
DataFrame dataFrame = sqlContext.read().format("jdbc").options(options).load();
dataFrame.write()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", "|")
.option("quote", "\"")
.option("quoteMode", QuoteMode.NON_NUMERIC.toString())
.option("escape", "\\")
.save("persons.csv");
Configuration hadoopConfiguration = javaSparkContext.hadoopConfiguration();
FileSystem hdfs = FileSystem.get(hadoopConfiguration);
FileUtil.copyMerge(hdfs, new Path("persons.csv"), hdfs, new Path("\home\persons1.csv"), true, hadoopConfiguration, new String());
ตามโค้ดจำเป็นต้องแปลงข้อมูล RDBMS เป็น csv/json โดยใช้ Spark แต่เมื่อฉันรันแอปพลิเคชันไคลเอ็นต์นี้ สามารถเชื่อมต่อกับเซิร์ฟเวอร์ Spark ระยะไกลได้ แต่ในคอนโซลได้รับข้อความเตือนต่อไปนี้อย่างต่อเนื่อง
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
และที่ฝั่งเซิร์ฟเวอร์บน Spark UI ในการรันแอปพลิเคชัน> สรุปตัวดำเนินการ> บันทึก stderr ได้รับข้อผิดพลาดดังต่อไปนี้
Exception in thread "main" java.io.IOException: Failed to connect to /192.168.56.1:53112
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /192.168.56.1:53112
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
แต่ไม่มีที่อยู่ IP ใด ๆ ที่กำหนดค่าเป็น 192.168.56.1
ดังนั้นจึงไม่มีการกำหนดค่าใด ๆ