นี่ดูเหมือนเป็นแมลงสำหรับฉัน จากการเจาะลึกโค้ด ดูเหมือนว่า Kinesis จะเพิกเฉยต่อการกำหนดค่า spark.streaming.receiver.maxRate
โดยสิ้นเชิง
หากคุณมองเข้าไปข้างใน KinesisReceiver.onStart
คุณจะเห็น:
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
Constructor นี้จบลงด้วยการเรียก Constructor อื่นซึ่งมีค่าเริ่มต้นจำนวนมากสำหรับการกำหนดค่า:
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
รายการที่คุณสนใจคือ DEFAULT_MAX_RECORDS
ซึ่งตั้งค่าไว้ที่ 10,000 รายการอย่างต่อเนื่อง มีวิธีการบน KinesisClientLibConfiguration
ที่เรียกว่า withMaxRecords
ที่คุณเรียกใช้เพื่อตั้งค่าจำนวนบันทึกจริง นี่ควรจะเป็นการแก้ไขที่ง่าย
แต่สำหรับตอนนี้ ดูเหมือนว่าตัวรับ Kinesis จะไม่เคารพพารามิเตอร์นั้น
person
Yuval Itzchakov
schedule
29.11.2016