Я пытаюсь понять, как создать конвейер данных AWS, который может взять файл json из S3 и импортировать его в таблицу DynamoDB. Я могу создать некоторый java-код, который этого достигнет, но я хочу сделать это через конвейер данных. Я вижу, что есть шаблоны для экспорта из DynamoDB в S3 и импорта резервной копии, но я изо всех сил пытаюсь понять, как импортировать простой файл json.
Импортировать файл json в DynamoDB через конвейер данных AWS
Ответы (1)
В документации вы найдете пример импорта и экспорта данных из DynamoDb (http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html)
Вот как это описано в документации:
Для создания конвейера
Откройте консоль AWS Data Pipeline по адресу
Первый экран, который вы увидите, зависит от того, создали ли вы конвейер в текущем регионе.
Если вы не создали конвейер в этой области, на консоли отобразится вводный экран. Выберите Начать сейчас.
Если вы уже создали конвейер в этом регионе, на консоли отобразится страница, на которой перечислены ваши конвейеры для этого региона. Выберите Создать новый конвейер.
В поле «Имя» введите имя для вашего конвейера.
(Необязательно) В поле «Описание» введите описание конвейера.
В качестве источника выберите «Создать с использованием шаблона», а затем выберите следующий шаблон: Импортировать данные резервного копирования DynamoDB из S3.
В разделе «Параметры» установите для папки Input S3 значение s3: // elasticmapreduce / samples / Store / ProductCatalog, которое является примером источника данных, и задайте имя таблицы DynamoDB для имени вашей таблицы.
В разделе «Расписание» выберите активацию конвейера.
В разделе «Конфигурация конвейера» оставьте ведение журнала включенным. Выберите значок папки в разделе «Расположение S3 для журналов», выберите одну из корзин или папок, а затем нажмите «Выбрать».
При желании вы можете вместо этого отключить ведение журнала.
В разделе «Безопасность / доступ» оставьте для ролей IAM значение «По умолчанию».
Щелкните "Редактировать" в Architect.
Затем настройте действия уведомления Amazon SNS, которые выполняет AWS Data Pipeline в зависимости от результата действия.
Чтобы настроить успешные и неудачные действия
На правой панели щелкните Действия.
В разделе «Добавить необязательное поле» выберите «При успешном завершении».
В только что добавленном On Success выберите Create new: Action.
В разделе «Добавить необязательное поле» выберите «При сбое».
В только что добавленном On Fail выберите Create new: Action.
На правой панели щелкните Другие.
Для DefaultAction1 сделайте следующее:
Измените имя на SuccessSnsAlarm.
В поле Тип выберите SnsAlarm.
В поле Topic Arn введите ARN созданной вами темы.
Введите тему и сообщение.
Для DefaultAction2 сделайте следующее:
Измените имя на FailureSnsAlarm.
В поле Тип выберите SnsAlarm.
В поле Topic Arn введите ARN созданной вами темы.
Введите тему и сообщение.
На общедоступном сайте github есть несколько примеров для работы с DynamoDB (https://github.com/awslabs/data-pipeline-samples). Вот пример определения конвейера:
{
"objects": [
{
"occurrences": "1",
"period": "1 Day",
"name": "RunOnce",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME",
"maxActiveInstances" : "1"
},
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "s3://",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"maximumRetries": "2",
"name": "TableBackupActivity",
"step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
"id": "TableBackupActivity",
"runsOn": {
"ref": "EmrClusterForBackup"
},
"type": "EmrActivity"
},
{
"bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
"name": "EmrClusterForBackup",
"amiVersion": "3.8.0",
"id": "EmrClusterForBackup",
"type": "EmrCluster",
"masterInstanceType": "m1.medium",
"coreInstanceType": "#{myInstanceType}",
"coreInstanceCount": "#{myInstanceCount}",
"terminateAfter" : "12 hours"
}
],
"parameters": [
{
"description": "OutputS3folder",
"id": "myOutputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"default": "0.2",
"watermark": "Valuebetween0.1-1.0",
"description": "DynamoDB Read Throughput Ratio",
"id": "myDDBReadThroughputRatio",
"type": "Double"
},
{
"description": "DynamoDB Table Name",
"id": "myDDBTableName",
"type": "String"
},
{
"description": "Instance Type",
"id": "myInstanceType",
"watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
"type": "String",
"default": "m3.xlarge"
},
{
"description": "Instance Count",
"watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
"id": "myInstanceCount",
"type": "Integer",
"default": "1"
},
{
"description" : "Burst IOPs",
"watermark" : "Add IOPS to the DDB table by this percent for the duration of the export job",
"id" : "myBurstIOPS",
"type" : "Double",
"default" : "0.0"
}
]
}