ฉันพยายามเชื่อมต่อกับสตรีม kafka(0.9.0) ผ่าน pyspark สำหรับหนึ่งในแอปพลิเคชันของฉัน ประสบปัญหาดังต่อไปนี้:
ขั้นตอนที่ดำเนินการ
เริ่มคาฟคาโดยใช้คำสั่งต่อไปนี้
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
การใช้ไลบรารี kafka-python ฉันได้เริ่ม kafka-producer แล้ว ไม่มีปัญหากับเรื่องนั้น ฉันสามารถใช้มันกลับผ่าน python ได้
ตอนนี้ถ้าใช้แบบเดียวกันผ่าน pyspark(1.5.2) ดังที่แสดงในโค้ดต่อไปนี้:
import sys from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext sc = SparkContext(appName="PythonStreamingKafka") ssc = StreamingContext(sc, 3) zkQuorum, topic = 'localhost:9092', 'test' kvs = KafkaUtils.createStream(ssc, zkQuorum,"my_group", {topic: 3}) lines = kvs.map(lambda x: x.value) counts = (lines.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a+b) ) counts.pprint() ssc.start() ssc.awaitTermination()
ฉันรันโค้ดด้านบนโดยใช้คำสั่งต่อไปนี้
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py
ฉันได้รับข้อผิดพลาดต่อไปนี้:
15/12/17 15:37:20 INFO ClientCnxn: Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:9092, initiating session 15/12/17 15:37:20 INFO PythonRunner: Times: total = 157, boot = 156, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 3.0 in stage 4.0 (TID 5). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 5) in 958 ms on localhost (1/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 305, boot = 304, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 2). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 1115 ms on localhost (2/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 457, boot = 456, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 1.0 in stage 4.0 (TID 3). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 1266 ms on localhost (3/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 306, boot = 304, init = 2, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 2.0 in stage 4.0 (TID 4). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 1268 ms on localhost (4/4) 15/12/17 15:37:20 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:393) finished in 1.272 s 15/12/17 15:37:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 15/12/17 15:37:20 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:393, took 1.297262 s 15/12/17 15:37:21 INFO JobScheduler: Added jobs for time 1450346841000 ms 15/12/17 15:37:21 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393 15/12/17 15:37:21 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:393) with 3 output partitions 15/12/17 15:37:21 INFO DAGScheduler: Final stage: ResultStage 6(runJob at PythonRDD.scala:393) 15/12/17 15:37:21 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5) 15/12/17 15:37:21 INFO DAGScheduler: Missing parents: List() 15/12/17 15:37:21 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43), which has no missing parents 15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(5576) called with curMem=100677, maxMem=556038881 15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KB, free 530.2 MB) 15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(3326) called with curMem=106253, maxMem=556038881 15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.2 KB, free 530.2 MB) 15/12/17 15:37:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:61820 (size: 3.2 KB, free: 530.3 MB) 15/12/17 15:37:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861 15/12/17 15:37:21 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43) 15/12/17 15:37:21 INFO TaskSchedulerImpl: Adding task set 6.0 with 3 tasks 15/12/17 15:37:21 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 7, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 8, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO Executor: Running task 0.0 in stage 6.0 (TID 6) 15/12/17 15:37:21 INFO Executor: Running task 2.0 in stage 6.0 (TID 8) 15/12/17 15:37:21 INFO Executor: Running task 1.0 in stage 6.0 (TID 7) 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:21 INFO PythonRunner: Times: total = 158, boot = 154, init = 1, finish = 3 C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:22 INFO PythonRunner: Times: total = 298, boot = 294, init = 1, finish = 3 C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:22 INFO PythonRunner: Times: total = 448, boot = 444, init = 1, finish = 3 15/12/17 15:37:22 INFO PythonRunner: Times: total = 152, boot = 151, init = 1, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 784 ms on localhost (1/3) 15/12/17 15:37:22 INFO PythonRunner: Times: total = 320, boot = 318, init = 2, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 2.0 in stage 6.0 (TID 8). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 8) in 952 ms on localhost (2/3) 15/12/17 15:37:22 INFO PythonRunner: Times: total = 172, boot = 171, init = 1, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 1.0 in stage 6.0 (TID 7). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 7) in 957 ms on localhost (3/3) 15/12/17 15:37:22 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:393) finished in 0.959 s 15/12/17 15:37:22 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 15/12/17 15:37:22 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:393, took 0.987050 s 15/12/17 15:37:23 INFO ClientCnxn: Client session timed out, have not heard from server in 3000ms for sessionid 0x0, closing socket connection and attempting re connect ------------------------------------------- Time: 2015-12-17 15:37:18 ------------------------------------------- 15/12/17 15:37:23 INFO JobScheduler: Finished job streaming job 1450346838000 ms.0 from job set of time 1450346838000 ms 15/12/17 15:37:23 INFO JobScheduler: Total delay: 5.780 s for time 1450346838000 ms (execution: 5.725 s) 15/12/17 15:37:23 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/12/17 15:37:23 INFO JobScheduler: Starting job streaming job 1450346841000 ms.0 from job set of time 1450346841000 ms
แม้ว่าส่วน “เวลา” จะยังคงปรากฏอยู่ก็ตาม
ไม่มีปัญหากับ pyspark หรือ kafka ทุกอย่างทำงานได้ดีอย่างสมบูรณ์แบบ ใครสามารถช่วยเกี่ยวกับปัญหานี้ได้