Это похоже на ошибку для меня. Судя по коду, Kinesis полностью игнорирует конфигурацию spark.streaming.receiver.maxRate
.
Если вы заглянете внутрь KinesisReceiver.onStart
, то увидите:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
Этот конструктор в конечном итоге вызывает другой конструктор, который имеет множество значений по умолчанию для конфигурации:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
Вас интересует DEFAULT_MAX_RECORDS
, для которого постоянно установлено значение 10 000 записей. В KinesisClientLibConfiguration
есть метод withMaxRecords
, который вы вызываете для установки фактического количества записей. Это должно быть легко исправить.
Но на данный момент кажется, что ресивер Kinesis не учитывает этот параметр.
person
Yuval Itzchakov
schedule
29.11.2016