Spark Streaming + Kinesis: нарушена максимальная скорость приемника

Я вызываю spark-submit, передавая maxRate, у меня есть один приемник kinesis и пакеты из 1 с.

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

однако одна партия может значительно превышать установленную максимальную скорость. то есть: я получаю 300 записей.

Я пропустил какую-либо настройку?


person David Przybilla    schedule 29.11.2016    source источник


Ответы (2)


Это похоже на ошибку для меня. Судя по коду, Kinesis полностью игнорирует конфигурацию spark.streaming.receiver.maxRate.

Если вы заглянете внутрь KinesisReceiver.onStart, то увидите:

val kinesisClientLibConfiguration =
  new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
  .withKinesisEndpoint(endpointUrl)
  .withInitialPositionInStream(initialPositionInStream)
  .withTaskBackoffTimeMillis(500)
  .withRegionName(regionName)

Этот конструктор в конечном итоге вызывает другой конструктор, который имеет множество значений по умолчанию для конфигурации:

public KinesisClientLibConfiguration(String applicationName,
        String streamName,
        AWSCredentialsProvider kinesisCredentialsProvider,
        AWSCredentialsProvider dynamoDBCredentialsProvider,
        AWSCredentialsProvider cloudWatchCredentialsProvider,
        String workerId) {
    this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
            dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
            DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
            DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
            DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
            new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
            DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
            DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}

Вас интересует DEFAULT_MAX_RECORDS, для которого постоянно установлено значение 10 000 записей. В KinesisClientLibConfiguration есть метод withMaxRecords, который вы вызываете для установки фактического количества записей. Это должно быть легко исправить.

Но на данный момент кажется, что ресивер Kinesis не учитывает этот параметр.

person Yuval Itzchakov    schedule 29.11.2016

Для дальнейшего использования.

Это известная ошибка, исправленная в Spark 2.2.0 выпуске.

person ssedano    schedule 09.08.2017