Ini tampak seperti bug bagi saya. Dari melihat-lihat kode, sepertinya Kinesis sepenuhnya mengabaikan konfigurasi spark.streaming.receiver.maxRate
.
Jika Anda melihat ke dalam KinesisReceiver.onStart
, Anda melihat:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
Konstruktor ini akhirnya memanggil konstruktor lain yang memiliki banyak nilai default untuk konfigurasinya:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
Yang Anda pedulikan adalah DEFAULT_MAX_RECORDS
yang terus-menerus disetel ke 10.000 catatan. Ada metode di KinesisClientLibConfiguration
bernama withMaxRecords
yang Anda panggil untuk menyetel jumlah rekaman sebenarnya. Ini seharusnya menjadi perbaikan yang mudah.
Namun untuk saat ini, sepertinya receiver Kinesis tidak mematuhi parameter tersebut.
person
Yuval Itzchakov
schedule
29.11.2016