Masalah PySpark mengalir dari Kafka

Saya mencoba menyambung ke aliran kafka(0.9.0) melalui pyspark untuk salah satu aplikasi saya. Menghadapi masalah berikut:

Langkah-langkah yang diambil

  1. Memulai kafka menggunakan perintah berikut

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    
  2. Menggunakan perpustakaan kafka-python saya telah memulai kafka-producer. Tidak ada masalah dengan itu, saya dapat menggunakannya kembali melalui python.

  3. Sekarang konsumsi yang sama melalui pyspark(1.5.2) seperti yang ditunjukkan pada kode berikut:

    import sys
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark import SparkContext
    
    sc = SparkContext(appName="PythonStreamingKafka")
    ssc = StreamingContext(sc, 3)
    
    zkQuorum, topic = 'localhost:9092', 'test'
    kvs = KafkaUtils.createStream(ssc, zkQuorum,"my_group", {topic: 3})
    
    lines = kvs.map(lambda x: x.value)
    counts = (lines.flatMap(lambda line: line.split(" "))
      .map(lambda word: (word, 1)) 
      .reduceByKey(lambda a, b: a+b)
     )
    
     counts.pprint()
    
     ssc.start()
     ssc.awaitTermination()
    

    Saya mengeksekusi kode di atas menggunakan perintah berikut

      spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar test.py
    

    Saya mendapatkan kesalahan berikut:

      15/12/17 15:37:20 INFO ClientCnxn: Socket connection established to 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:9092, initiating session
      15/12/17 15:37:20 INFO PythonRunner: Times: total = 157, boot = 156, init = 1, finish = 0
      15/12/17 15:37:20 INFO Executor: Finished task 3.0 in stage 4.0 (TID 5). 1213 bytes result sent to driver
      15/12/17 15:37:20 INFO TaskSetManager: Finished task 3.0 in stage 4.0 (TID 5) in 958 ms on localhost (1/4)
      15/12/17 15:37:20 INFO PythonRunner: Times: total = 305, boot = 304, init = 1, finish = 0
      15/12/17 15:37:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 2). 1213 bytes result sent to driver
      15/12/17 15:37:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 2) in 1115 ms on localhost (2/4)
      15/12/17 15:37:20 INFO PythonRunner: Times: total = 457, boot = 456, init = 1, finish = 0
      15/12/17 15:37:20 INFO Executor: Finished task 1.0 in stage 4.0 (TID 3). 1213 bytes result sent to driver
      15/12/17 15:37:20 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 3) in 1266 ms on localhost (3/4)
      15/12/17 15:37:20 INFO PythonRunner: Times: total = 306, boot = 304, init = 2, finish = 0
      15/12/17 15:37:20 INFO Executor: Finished task 2.0 in stage 4.0 (TID 4). 1213 bytes result sent to driver
      15/12/17 15:37:20 INFO TaskSetManager: Finished task 2.0 in stage 4.0 (TID 4) in 1268 ms on localhost (4/4)
      15/12/17 15:37:20 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:393) finished in 1.272 s
      15/12/17 15:37:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
      15/12/17 15:37:20 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:393, took 1.297262 s
      15/12/17 15:37:21 INFO JobScheduler: Added jobs for time 1450346841000 ms
      15/12/17 15:37:21 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
      15/12/17 15:37:21 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:393) with 3 output partitions
      15/12/17 15:37:21 INFO DAGScheduler: Final stage: ResultStage 6(runJob at PythonRDD.scala:393)
      15/12/17 15:37:21 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
      15/12/17 15:37:21 INFO DAGScheduler: Missing parents: List()
      15/12/17 15:37:21 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43), which has no missing parents
      15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(5576) called with curMem=100677, maxMem=556038881
      15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 5.4 KB, free 530.2 MB)
      15/12/17 15:37:21 INFO MemoryStore: ensureFreeSpace(3326) called with curMem=106253, maxMem=556038881
      15/12/17 15:37:21 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.2 KB, free 530.2 MB)
      15/12/17 15:37:21 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:61820 (size: 3.2 KB, free: 530.3 MB)
      15/12/17 15:37:21 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861
      15/12/17 15:37:21 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 6 (PythonRDD[15] at RDD at PythonRDD.scala:43)
      15/12/17 15:37:21 INFO TaskSchedulerImpl: Adding task set 6.0 with 3 tasks
      15/12/17 15:37:21 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 2024 bytes)
      15/12/17 15:37:21 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 7, localhost, PROCESS_LOCAL, 2024 bytes)
      15/12/17 15:37:21 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 8, localhost, PROCESS_LOCAL, 2024 bytes)
      15/12/17 15:37:21 INFO Executor: Running task 0.0 in stage 6.0 (TID 6)
      15/12/17 15:37:21 INFO Executor: Running task 2.0 in stage 6.0 (TID 8)
      15/12/17 15:37:21 INFO Executor: Running task 1.0 in stage 6.0 (TID 7)
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
      15/12/17 15:37:21 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
      C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
      15/12/17 15:37:21 INFO PythonRunner: Times: total = 158, boot = 154, init = 1, finish = 3
      C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
      15/12/17 15:37:22 INFO PythonRunner: Times: total = 298, boot = 294, init = 1, finish = 3
      C:\Spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
      15/12/17 15:37:22 INFO PythonRunner: Times: total = 448, boot = 444, init = 1, finish = 3
      15/12/17 15:37:22 INFO PythonRunner: Times: total = 152, boot = 151, init = 1, finish = 0
      15/12/17 15:37:22 INFO Executor: Finished task 0.0 in stage 6.0 (TID 6). 1213 bytes result sent to driver
      15/12/17 15:37:22 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 784 ms on localhost (1/3)
      15/12/17 15:37:22 INFO PythonRunner: Times: total = 320, boot = 318, init = 2, finish = 0
      15/12/17 15:37:22 INFO Executor: Finished task 2.0 in stage 6.0 (TID 8). 1213 bytes result sent to driver
      15/12/17 15:37:22 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 8) in 952 ms on localhost (2/3)
      15/12/17 15:37:22 INFO PythonRunner: Times: total = 172, boot = 171, init = 1, finish = 0
      15/12/17 15:37:22 INFO Executor: Finished task 1.0 in stage 6.0 (TID 7). 1213 bytes result sent to driver
      15/12/17 15:37:22 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 7) in 957 ms on localhost (3/3)
      15/12/17 15:37:22 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:393) finished in 0.959 s
      15/12/17 15:37:22 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
      15/12/17 15:37:22 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:393, took 0.987050 s
      15/12/17 15:37:23 INFO ClientCnxn: Client session timed out, have not heard from server in 3000ms for sessionid 0x0, closing socket connection and attempting re
      connect
      -------------------------------------------
      Time: 2015-12-17 15:37:18
      -------------------------------------------
    
      15/12/17 15:37:23 INFO JobScheduler: Finished job streaming job 1450346838000 ms.0 from job set of time 1450346838000 ms
      15/12/17 15:37:23 INFO JobScheduler: Total delay: 5.780 s for time 1450346838000 ms (execution: 5.725 s)
      15/12/17 15:37:23 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
      15/12/17 15:37:23 INFO JobScheduler: Starting job streaming job 1450346841000 ms.0 from job set of time 1450346841000 ms
    

    Meskipun bagian "Waktu" terus muncul.

    Tidak ada masalah dengan pyspark atau kafka, semuanya berfungsi dengan baik. Adakah yang bisa membantu mengatasi masalah ini.


person user2564563    schedule 17.12.2015    source sumber


Jawaban (2)


Saya pikir ada kesalahan di baris ini

zkQuorum, topic = 'localhost:9092', 'test'

port penjaga kebun binatang seharusnya 2181

zkQuorum, topic = 'localhost:2181', 'test'

sumber :http://spark.apache.org/docs/latest/streaming-kafka-integration.html

person rj087    schedule 29.01.2016

Tanpa mengulangi RDD , Anda tidak dapat memproses setiap catatan dengan menggunakan foreachRDD karena DSstream akan membuat RDD kontinu.

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        lines = record.map(lambda x: x.value)
        counts = (lines.flatMap(lambda line: line.split(" "))
                  .map(lambda word: (word, 1)) 
                 .reduceByKey(lambda a, b: a+b)
                  )
         counts.pprint()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

Salam

Karthikeyan Rasipalayam Durairaj

person Karthikeyan Rasipalay Durairaj    schedule 15.01.2019