Impor file json ke DynamoDB melalui pipa AWS Data

Saya mencoba mencari cara untuk membuat saluran data AWS yang dapat mengambil file json dari S3 dan mengimpornya ke tabel DynamoDB. Saya dapat membuat beberapa kode Java yang mencapai hal ini tetapi saya ingin melakukannya melalui pipa Data. Saya dapat melihat ada templat untuk mengekspor dari DynamoDB ke S3 dan mengimpor cadangan, tetapi saya kesulitan mencari cara untuk mengimpor file json biasa.


person Sutty1000    schedule 20.04.2016    source sumber


Jawaban (1)


Dalam dokumentasi Anda akan menemukan contoh mengimpor dan mengekspor data dari DynamoDb (http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html)

Begini penjelasannya dalam dokumentasi:

Untuk membuat saluran pipa

Buka konsol AWS Data Pipeline di

Layar pertama yang Anda lihat bergantung pada apakah Anda telah membuat alur di wilayah saat ini.

Jika Anda belum membuat alur di wilayah ini, konsol akan menampilkan layar pengantar. Pilih Mulai sekarang.

Jika Anda sudah membuat alur di wilayah ini, konsol akan menampilkan halaman yang mencantumkan alur Anda untuk wilayah tersebut. Pilih Buat alur baru.

Di Nama, masukkan nama untuk alur Anda.

(Opsional) DiDeskripsi, masukkan deskripsi untuk alur Anda.

Untuk Sumber, pilih Bangun menggunakan templat, lalu pilih templat berikut: Impor data cadangan DynamoDB dari S3.

Di bawah Parameter, atur folder Input S3 ke s3://elasticmapreduce/samples/Store/ProductCatalog, yang merupakan sumber data sampel, dan atur nama tabel DynamoDB ke nama tabel Anda.

Di bawah Jadwal, pilih aktivasi alur.

Di bawah Konfigurasi Alur, biarkan logging diaktifkan. Pilih ikon folder di bawah Lokasi S3 untuk log, pilih salah satu bucket atau folder Anda, lalu pilih Pilih.

Jika mau, Anda dapat menonaktifkan logging.

Di bagian Keamanan/Akses, biarkan IAM role diatur ke Default.

Klik Edit di Arsitek.

Selanjutnya, konfigurasikan tindakan notifikasi Amazon SNS yang dilakukan AWS Data Pipeline bergantung pada hasil aktivitas.

Untuk mengonfigurasi tindakan sukses dan gagal

Di panel kanan, klik Aktivitas.

Dari Tambahkan bidang opsional, pilih Saat Berhasil.

Dari On Success yang baru ditambahkan, pilih Buat baru: Tindakan.

Dari Tambahkan bidang opsional, pilih Saat Gagal.

Dari On Fail yang baru ditambahkan, pilih Buat baru: Tindakan.

Di panel kanan, klik Lainnya.

Untuk DefaultAction1, lakukan hal berikut:

Ubah namanya menjadi SuccessSnsAlarm.

Dari Jenis, pilih SnsAlarm.

Di Topik Arn, masukkan ARN topik yang Anda buat.

Masukkan subjek dan pesan.

Untuk DefaultAction2, lakukan hal berikut:

Ubah namanya menjadi FailureSnsAlarm.

Dari Jenis, pilih SnsAlarm.

Di Topik Arn, masukkan ARN topik yang Anda buat.

Masukkan subjek dan pesan.

Situs github publik memiliki beberapa contoh untuk bekerja dengan DynamoDB (https://github.com/awslabs/data-pipeline-samples). Berikut ini contoh definisi pipeline:

{
    "objects": [
        {
            "occurrences": "1",
            "period": "1 Day",
            "name": "RunOnce",
            "id": "DefaultSchedule",
            "type": "Schedule",
            "startAt": "FIRST_ACTIVATION_DATE_TIME",
        "maxActiveInstances" : "1"
        },
        {
            "failureAndRerunMode": "CASCADE",
            "schedule": {
                "ref": "DefaultSchedule"
            },
            "resourceRole": "DataPipelineDefaultResourceRole",
            "role": "DataPipelineDefaultRole",
            "pipelineLogUri": "s3://",
            "scheduleType": "cron",
            "name": "Default",
            "id": "Default"
        },
        {
            "maximumRetries": "2",
            "name": "TableBackupActivity",
            "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}",
            "id": "TableBackupActivity",
            "runsOn": {
                "ref": "EmrClusterForBackup"
            },
            "type": "EmrActivity"
        },
        {
            "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false",
            "name": "EmrClusterForBackup",
            "amiVersion": "3.8.0",
            "id": "EmrClusterForBackup",
            "type": "EmrCluster",
            "masterInstanceType": "m1.medium",
            "coreInstanceType": "#{myInstanceType}",
            "coreInstanceCount": "#{myInstanceCount}",
        "terminateAfter" : "12 hours"
        }
    ],
    "parameters": [
        {
            "description": "OutputS3folder",
            "id": "myOutputS3Loc",
            "type": "AWS::S3::ObjectKey"
        },
        {
            "default": "0.2",
            "watermark": "Valuebetween0.1-1.0",
            "description": "DynamoDB Read Throughput Ratio",
            "id": "myDDBReadThroughputRatio",
            "type": "Double"
        },
        {
            "description": "DynamoDB Table Name",
            "id": "myDDBTableName",
            "type": "String"
        },
        {
            "description": "Instance Type",
            "id": "myInstanceType",
            "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge",
            "type": "String",
            "default": "m3.xlarge"
        },
        {
            "description": "Instance Count",
        "watermark" : "(Read Capacity Units / 300) for m1.medium if RCU <= 900. Else (RCU / 1500) for m3.xlarge",
            "id": "myInstanceCount",
            "type": "Integer",
            "default": "1"
        },
    {
        "description" : "Burst IOPs",
        "watermark"   : "Add IOPS to the DDB table by this percent for the duration of the export job",
            "id"          : "myBurstIOPS",
            "type"     :    "Double",
            "default"     : "0.0"
    }
    ]
}
person Supratik    schedule 25.04.2016