Saya mencoba menyambung ke aliran kafka(0.9.0) melalui pyspark untuk salah satu aplikasi saya. Menghadapi masalah berikut:
Langkah-langkah yang diambil
Memulai kafka menggunakan perintah berikut
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
Menggunakan perpustakaan kafka-python saya telah memulai kafka-producer. Tidak ada masalah dengan itu, saya dapat menggunakannya kembali melalui python.
Sekarang konsumsi yang sama melalui pyspark(1.5.2) seperti yang ditunjukkan pada kode berikut:
import sys from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkContext sc = SparkContext(appName="PythonStreamingKafka") ssc = StreamingContext(sc, 3) zkQuorum, topic = 'localhost:9092', 'test' kvs = KafkaUtils.createStream(ssc, zkQuorum,"my_group", {topic: 3}) lines = kvs.map(lambda x: x.value) counts = (lines.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a+b) ) counts.pprint() ssc.start() ssc.awaitTermination()
Saya mengeksekusi kode di atas menggunakan perintah berikut
spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py
Saya mendapatkan kesalahan berikut:
15/12/17 15:37:20 INFO ClientCnxn: Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:9092, initiating session 15/12/17 15:37:20 INFO PythonRunner: Times: total = 157, boot = 156, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 3.0 in stage 4.0 (TID 5). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 5) in 958 ms on localhost (1/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 305, boot = 304, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 2). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 1115 ms on localhost (2/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 457, boot = 456, init = 1, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 1.0 in stage 4.0 (TID 3). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 1266 ms on localhost (3/4) 15/12/17 15:37:20 INFO PythonRunner: Times: total = 306, boot = 304, init = 2, finish = 0 15/12/17 15:37:20 INFO Executor: Finished task 2.0 in stage 4.0 (TID 4). 1213 bytes result sent to driver 15/12/17 15:37:20 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 1268 ms on localhost (4/4) 15/12/17 15:37:20 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:393) finished in 1.272 s 15/12/17 15:37:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 15/12/17 15:37:20 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:393, took 1.297262 s 15/12/17 15:37:21 INFO JobScheduler: Added jobs for time 1450346841000 ms 15/12/17 15:37:21 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393 15/12/17 15:37:21 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:393) with 3 output partitions 15/12/17 15:37:21 INFO DAGScheduler: Final stage: ResultStage 6(runJob at PythonRDD.scala:393) 15/12/17 15:37:21 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5) 15/12/17 15:37:21 INFO DAGScheduler: Missing parents: List() 15/12/17 15:37:21 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43), which has no missing parents 15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(5576) called with curMem=100677, maxMem=556038881 15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KB, free 530.2 MB) 15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(3326) called with curMem=106253, maxMem=556038881 15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.2 KB, free 530.2 MB) 15/12/17 15:37:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:61820 (size: 3.2 KB, free: 530.3 MB) 15/12/17 15:37:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861 15/12/17 15:37:21 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43) 15/12/17 15:37:21 INFO TaskSchedulerImpl: Adding task set 6.0 with 3 tasks 15/12/17 15:37:21 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 7, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 8, localhost, PROCESS_LOCAL, 2024 bytes) 15/12/17 15:37:21 INFO Executor: Running task 0.0 in stage 6.0 (TID 6) 15/12/17 15:37:21 INFO Executor: Running task 2.0 in stage 6.0 (TID 8) 15/12/17 15:37:21 INFO Executor: Running task 1.0 in stage 6.0 (TID 7) 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms 15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:21 INFO PythonRunner: Times: total = 158, boot = 154, init = 1, finish = 3 C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:22 INFO PythonRunner: Times: total = 298, boot = 294, init = 1, finish = 3 C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling 15/12/17 15:37:22 INFO PythonRunner: Times: total = 448, boot = 444, init = 1, finish = 3 15/12/17 15:37:22 INFO PythonRunner: Times: total = 152, boot = 151, init = 1, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 784 ms on localhost (1/3) 15/12/17 15:37:22 INFO PythonRunner: Times: total = 320, boot = 318, init = 2, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 2.0 in stage 6.0 (TID 8). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 8) in 952 ms on localhost (2/3) 15/12/17 15:37:22 INFO PythonRunner: Times: total = 172, boot = 171, init = 1, finish = 0 15/12/17 15:37:22 INFO Executor: Finished task 1.0 in stage 6.0 (TID 7). 1213 bytes result sent to driver 15/12/17 15:37:22 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 7) in 957 ms on localhost (3/3) 15/12/17 15:37:22 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:393) finished in 0.959 s 15/12/17 15:37:22 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 15/12/17 15:37:22 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:393, took 0.987050 s 15/12/17 15:37:23 INFO ClientCnxn: Client session timed out, have not heard from server in 3000ms for sessionid 0x0, closing socket connection and attempting re connect ------------------------------------------- Time: 2015-12-17 15:37:18 ------------------------------------------- 15/12/17 15:37:23 INFO JobScheduler: Finished job streaming job 1450346838000 ms.0 from job set of time 1450346838000 ms 15/12/17 15:37:23 INFO JobScheduler: Total delay: 5.780 s for time 1450346838000 ms (execution: 5.725 s) 15/12/17 15:37:23 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/12/17 15:37:23 INFO JobScheduler: Starting job streaming job 1450346841000 ms.0 from job set of time 1450346841000 ms
Meskipun bagian "Waktu" terus muncul.
Tidak ada masalah dengan pyspark atau kafka, semuanya berfungsi dengan baik. Adakah yang bisa membantu mengatasi masalah ini.