Импортировать файл json в DynamoDB через конвейер данных AWS

Я пытаюсь понять, как создать конвейер данных AWS, который может взять файл json из S3 и импортировать его в таблицу DynamoDB. Я могу создать некоторый java-код, который этого достигнет, но я хочу сделать это через конвейер данных. Я вижу, что есть шаблоны для экспорта из DynamoDB в S3 и импорта резервной копии, но я изо всех сил пытаюсь понять, как импортировать простой файл json.


person Sutty1000    schedule 20.04.2016    source источник


Ответы (1)


В документации вы найдете пример импорта и экспорта данных из DynamoDb (http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html)

Вот как это описано в документации:

Для создания конвейера

Откройте консоль AWS Data Pipeline по адресу

Первый экран, который вы увидите, зависит от того, создали ли вы конвейер в текущем регионе.

Если вы не создали конвейер в этой области, на консоли отобразится вводный экран. Выберите Начать сейчас.

Если вы уже создали конвейер в этом регионе, на консоли отобразится страница, на которой перечислены ваши конвейеры для этого региона. Выберите Создать новый конвейер.

В поле «Имя» введите имя для вашего конвейера.

(Необязательно) В поле «Описание» введите описание конвейера.

В качестве источника выберите «Создать с использованием шаблона», а затем выберите следующий шаблон: Импортировать данные резервного копирования DynamoDB из S3.

В разделе «Параметры» установите для папки Input S3 значение s3: // elasticmapreduce / samples / Store / ProductCatalog, которое является примером источника данных, и задайте имя таблицы DynamoDB для имени вашей таблицы.

В разделе «Расписание» выберите активацию конвейера.

В разделе «Конфигурация конвейера» оставьте ведение журнала включенным. Выберите значок папки в разделе «Расположение S3 для журналов», выберите одну из корзин или папок, а затем нажмите «Выбрать».

При желании вы можете вместо этого отключить ведение журнала.

В разделе «Безопасность / доступ» оставьте для ролей IAM значение «По умолчанию».

Щелкните "Редактировать" в Architect.

Затем настройте действия уведомления Amazon SNS, которые выполняет AWS Data Pipeline в зависимости от результата действия.

Чтобы настроить успешные и неудачные действия

На правой панели щелкните Действия.

В разделе «Добавить необязательное поле» выберите «При успешном завершении».

В только что добавленном On Success выберите Create new: Action.

В разделе «Добавить необязательное поле» выберите «При сбое».

В только что добавленном On Fail выберите Create new: Action.

На правой панели щелкните Другие.

Для DefaultAction1 сделайте следующее:

Измените имя на SuccessSnsAlarm.

В поле Тип выберите SnsAlarm.

В поле Topic Arn введите ARN созданной вами темы.

Введите тему и сообщение.

Для DefaultAction2 сделайте следующее:

Измените имя на FailureSnsAlarm.

В поле Тип выберите SnsAlarm.

В поле Topic Arn введите ARN созданной вами темы.

Введите тему и сообщение.

На общедоступном сайте github есть несколько примеров для работы с DynamoDB (https://github.com/awslabs/data-pipeline-samples). Вот пример определения конвейера:

{
    "objects": [
        {
            "occurrences": "1",
            "period": "1 Day",
            "name": "RunOnce",
            "id": "DefaultSchedule",
            "type": "Schedule",
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
        "maxActiveInstances" : "1"
        },
        {
            "failureAndRerunMode": "CASCADE",
            "schedule": {
                "ref": "DefaultSchedule"
            },
            "resourceRole": "DataPipelineDefaultResourceRole",
            "role": "DataPipelineDefaultRole",
            "pipelineLogUri": "s3://",
            "scheduleType": "cron",
            "name": "Default",
            "id": "Default"
        },
        {
            "maximumRetries": "2",
            "name": "TableBackupActivity",
            "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
            "id": "TableBackupActivity",
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "type": "EmrActivity"
        },
        {
            "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "id": "EmrClusterForBackup",
            "type": "EmrCluster",
            "masterInstanceType": "m1.medium",
            "coreInstanceType": "#{myInstanceType}",
            "coreInstanceCount": "#{myInstanceCount}",
        "terminateAfter" : "12 hours"
        }
    ],
    "parameters": [
        {
            "description": "OutputS3folder",
            "id": "myOutputS3Loc",
            "type": "AWS::S3::ObjectKey"
        },
        {
            "default": "0.2",
            "watermark": "Valuebetween0.1-1.0",
            "description": "DynamoDB Read Throughput Ratio",
            "id": "myDDBReadThroughputRatio",
            "type": "Double"
        },
        {
            "description": "DynamoDB Table Name",
            "id": "myDDBTableName",
            "type": "String"
        },
        {
            "description": "Instance Type",
            "id": "myInstanceType",
            "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
            "type": "String",
            "default": "m3.xlarge"
        },
        {
            "description": "Instance Count",
        "watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
            "id": "myInstanceCount",
            "type": "Integer",
            "default": "1"
        },
    {
        "description" : "Burst IOPs",
        "watermark"   : "Add IOPS to the DDB table by this percent for the duration of the export job",
            "id"          : "myBurstIOPS",
            "type"     :    "Double",
            "default"     : "0.0"
    }
    ]
}
person Supratik    schedule 25.04.2016