นำเข้าไฟล์ json ไปยัง DynamoDB ผ่านไปป์ไลน์ AWS Data

ฉันกำลังพยายามหาวิธีสร้างไปป์ไลน์ข้อมูล AWS ที่สามารถนำไฟล์ json จาก S3 และนำเข้าลงในตาราง DynamoDB ได้อย่างไร ฉันสามารถสร้างโค้ด Java บางตัวที่ทำสิ่งนี้ได้ แต่ฉันต้องการทำผ่าน Data Pipeline ฉันเห็นว่ามีเทมเพลตสำหรับส่งออกจาก DynamoDB ไปยัง S3 และการนำเข้าข้อมูลสำรอง แต่ฉันกำลังดิ้นรนหาวิธีนำเข้าไฟล์ json ธรรมดา




คำตอบ (1)


ในเอกสารประกอบ คุณจะพบตัวอย่างการนำเข้าและส่งออกข้อมูลจาก DynamoDb (http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html)

ต่อไปนี้เป็นวิธีอธิบายไว้ในเอกสารประกอบ:

เพื่อสร้างไปป์ไลน์

เปิดคอนโซล AWS Data Pipeline ที่

หน้าจอแรกที่คุณเห็นจะขึ้นอยู่กับว่าคุณได้สร้างไปป์ไลน์ในภูมิภาคปัจจุบันหรือไม่

หากคุณยังไม่ได้สร้างไปป์ไลน์ในภูมิภาคนี้ คอนโซลจะแสดงหน้าจอแนะนำ เลือกเริ่มต้นทันที

หากคุณได้สร้างไปป์ไลน์ในภูมิภาคนี้แล้ว คอนโซลจะแสดงหน้าที่แสดงรายการไปป์ไลน์ของคุณสำหรับภูมิภาคนั้น เลือกสร้างไปป์ไลน์ใหม่

ในชื่อ ให้ป้อนชื่อสำหรับไปป์ไลน์ของคุณ

(ไม่บังคับ) ในคำอธิบาย ให้ป้อนคำอธิบายสำหรับไปป์ไลน์ของคุณ

สำหรับแหล่งที่มา ให้เลือกสร้างโดยใช้เทมเพลต จากนั้นเลือกเทมเพลตต่อไปนี้: นำเข้าข้อมูลสำรอง DynamoDB จาก S3

ใต้พารามิเตอร์ ให้ตั้งค่าโฟลเดอร์อินพุต S3 เป็น s3://elasticmapreduce/samples/Store/ProductCatalog ซึ่งเป็นแหล่งข้อมูลตัวอย่าง และตั้งชื่อตาราง DynamoDB เป็นชื่อตารางของคุณ

ภายใต้กำหนดการ เลือกการเปิดใช้งานไปป์ไลน์

ภายใต้การกำหนดค่าไปป์ไลน์ ให้เปิดใช้งานการบันทึกทิ้งไว้ เลือกไอคอนโฟลเดอร์ใต้ตำแหน่ง S3 สำหรับบันทึก เลือกหนึ่งในบัคเก็ตหรือโฟลเดอร์ของคุณ จากนั้นเลือกเลือก

หากต้องการ คุณสามารถปิดใช้งานการบันทึกแทนได้

ภายใต้ความปลอดภัย/การเข้าถึง ให้ปล่อยให้บทบาท IAM ตั้งค่าเป็นค่าเริ่มต้น

คลิกแก้ไขในสถาปนิก

ถัดไป กำหนดค่าการดำเนินการแจ้งเตือนของ Amazon SNS ที่ AWS Data Pipeline ดำเนินการ โดยขึ้นอยู่กับผลลัพธ์ของกิจกรรม

เพื่อกำหนดค่าการดำเนินการสำเร็จและล้มเหลว

ในบานหน้าต่างด้านขวา คลิกกิจกรรม

จาก เพิ่มฟิลด์เสริม เลือก เมื่อสำเร็จ

จาก On Success ที่เพิ่มเข้ามาใหม่ เลือก Create new: Action

จาก เพิ่มฟิลด์เสริม ให้เลือก เมื่อล้มเหลว

จาก On Fail ที่เพิ่มเข้ามาใหม่ ให้เลือกสร้างใหม่: การดำเนินการ

ในบานหน้าต่างด้านขวา คลิกอื่นๆ

สำหรับ DefaultAction1 ให้ทำดังต่อไปนี้:

เปลี่ยนชื่อเป็น SuccessSnsAlarm

จากประเภท ให้เลือก SnsAlarm

ในหัวข้อ Arn ให้ป้อน ARN ของหัวข้อที่คุณสร้างขึ้น

กรอกหัวเรื่องและข้อความ

สำหรับ DefaultAction2 ให้ทำดังต่อไปนี้:

เปลี่ยนชื่อเป็น FailureSnsAlarm

จากประเภท ให้เลือก SnsAlarm

ในหัวข้อ Arn ให้ป้อน ARN ของหัวข้อที่คุณสร้างขึ้น

กรอกหัวเรื่องและข้อความ

ไซต์ GitHub สาธารณะมีตัวอย่างบางส่วนสำหรับการทำงานกับ DynamoDB (https://github.com/awslabs/data-pipeline-samples) ต่อไปนี้เป็นตัวอย่างคำจำกัดความไปป์ไลน์:

{
    "objects": [
        {
            "occurrences": "1",
            "period": "1 Day",
            "name": "RunOnce",
            "id": "DefaultSchedule",
            "type": "Schedule",
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
        "maxActiveInstances" : "1"
        },
        {
            "failureAndRerunMode": "CASCADE",
            "schedule": {
                "ref": "DefaultSchedule"
            },
            "resourceRole": "DataPipelineDefaultResourceRole",
            "role": "DataPipelineDefaultRole",
            "pipelineLogUri": "s3://",
            "scheduleType": "cron",
            "name": "Default",
            "id": "Default"
        },
        {
            "maximumRetries": "2",
            "name": "TableBackupActivity",
            "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
            "id": "TableBackupActivity",
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "type": "EmrActivity"
        },
        {
            "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "id": "EmrClusterForBackup",
            "type": "EmrCluster",
            "masterInstanceType": "m1.medium",
            "coreInstanceType": "#{myInstanceType}",
            "coreInstanceCount": "#{myInstanceCount}",
        "terminateAfter" : "12 hours"
        }
    ],
    "parameters": [
        {
            "description": "OutputS3folder",
            "id": "myOutputS3Loc",
            "type": "AWS::S3::ObjectKey"
        },
        {
            "default": "0.2",
            "watermark": "Valuebetween0.1-1.0",
            "description": "DynamoDB Read Throughput Ratio",
            "id": "myDDBReadThroughputRatio",
            "type": "Double"
        },
        {
            "description": "DynamoDB Table Name",
            "id": "myDDBTableName",
            "type": "String"
        },
        {
            "description": "Instance Type",
            "id": "myInstanceType",
            "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
            "type": "String",
            "default": "m3.xlarge"
        },
        {
            "description": "Instance Count",
        "watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
            "id": "myInstanceCount",
            "type": "Integer",
            "default": "1"
        },
    {
        "description" : "Burst IOPs",
        "watermark"   : "Add IOPS to the DDB table by this percent for the duration of the export job",
            "id"          : "myBurstIOPS",
            "type"     :    "Double",
            "default"     : "0.0"
    }
    ]
}
person Supratik    schedule 25.04.2016