Spark Streaming + Kinesis : Penerima MaxRate dilanggar

Saya memanggil spark-submit dengan meneruskan maxRate, saya memiliki satu penerima kinesis, dan kumpulan 1 detik

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

namun satu batch bisa jauh melebihi maxRate yang ditetapkan. yaitu: Saya mendapatkan 300 catatan.

Apakah saya melewatkan pengaturan apa pun?


person David Przybilla    schedule 29.11.2016    source sumber


Jawaban (2)


Ini tampak seperti bug bagi saya. Dari melihat-lihat kode, sepertinya Kinesis sepenuhnya mengabaikan konfigurasi spark.streaming.receiver.maxRate.

Jika Anda melihat ke dalam KinesisReceiver.onStart, Anda melihat:

val kinesisClientLibConfiguration =
  new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
  .withKinesisEndpoint(endpointUrl)
  .withInitialPositionInStream(initialPositionInStream)
  .withTaskBackoffTimeMillis(500)
  .withRegionName(regionName)

Konstruktor ini akhirnya memanggil konstruktor lain yang memiliki banyak nilai default untuk konfigurasinya:

public KinesisClientLibConfiguration(String applicationName,
        String streamName,
        AWSCredentialsProvider kinesisCredentialsProvider,
        AWSCredentialsProvider dynamoDBCredentialsProvider,
        AWSCredentialsProvider cloudWatchCredentialsProvider,
        String workerId) {
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
            dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
            DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
            DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
            DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
            new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
            DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
            DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}

Yang Anda pedulikan adalah DEFAULT_MAX_RECORDS yang terus-menerus disetel ke 10.000 catatan. Ada metode di KinesisClientLibConfiguration bernama withMaxRecords yang Anda panggil untuk menyetel jumlah rekaman sebenarnya. Ini seharusnya menjadi perbaikan yang mudah.

Namun untuk saat ini, sepertinya receiver Kinesis tidak mematuhi parameter tersebut.

person Yuval Itzchakov    schedule 29.11.2016

Untuk referensi di masa mendatang.

Ini adalah bug umum yang telah diperbaiki pada rilis Spark 2.2.0

person ssedano    schedule 09.08.2017