Spark Streaming + Kinesis: ตัวรับ MaxRate ถูกละเมิด

ฉันกำลังเรียก spark-submit ผ่าน maxRate ฉันมีตัวรับ kinesis ตัวเดียวและแบตช์ 1 วินาที

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

อย่างไรก็ตาม ชุดเดียวสามารถเกินอัตราสูงสุดที่กำหนดไว้ได้อย่างมาก เช่น: ฉันได้รับ 300 บันทึก

ฉันพลาดการตั้งค่าใด ๆ หรือไม่?


person David Przybilla    schedule 29.11.2016    source แหล่งที่มา


คำตอบ (2)


นี่ดูเหมือนเป็นแมลงสำหรับฉัน จากการเจาะลึกโค้ด ดูเหมือนว่า Kinesis จะเพิกเฉยต่อการกำหนดค่า spark.streaming.receiver.maxRate โดยสิ้นเชิง

หากคุณมองเข้าไปข้างใน KinesisReceiver.onStart คุณจะเห็น:

val kinesisClientLibConfiguration =
  new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
  .withKinesisEndpoint(endpointUrl)
  .withInitialPositionInStream(initialPositionInStream)
  .withTaskBackoffTimeMillis(500)
  .withRegionName(regionName)

Constructor นี้จบลงด้วยการเรียก Constructor อื่นซึ่งมีค่าเริ่มต้นจำนวนมากสำหรับการกำหนดค่า:

public KinesisClientLibConfiguration(String applicationName,
        String streamName,
        AWSCredentialsProvider kinesisCredentialsProvider,
        AWSCredentialsProvider dynamoDBCredentialsProvider,
        AWSCredentialsProvider cloudWatchCredentialsProvider,
        String workerId) {
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
            dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
            DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
            DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
            DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
            new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
            DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
            DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}

รายการที่คุณสนใจคือ DEFAULT_MAX_RECORDS ซึ่งตั้งค่าไว้ที่ 10,000 รายการอย่างต่อเนื่อง มีวิธีการบน KinesisClientLibConfiguration ที่เรียกว่า withMaxRecords ที่คุณเรียกใช้เพื่อตั้งค่าจำนวนบันทึกจริง นี่ควรจะเป็นการแก้ไขที่ง่าย

แต่สำหรับตอนนี้ ดูเหมือนว่าตัวรับ Kinesis จะไม่เคารพพารามิเตอร์นั้น

person Yuval Itzchakov    schedule 29.11.2016

สำหรับการอ้างอิงในอนาคต.

นี่เป็นbug ที่ทราบแล้ว ซึ่งแก้ไขแล้วใน Spark 2.2.0 release

person ssedano    schedule 09.08.2017