Nhập dữ liệu CSV vào Amazon DynamoDB bằng AWS Lambda

bởi Mike Wells and Bill Pfeiffer vào ngày 12 tháng 5 năm 2025 trong Advanced (300), Amazon DynamoDB, AWS Lambda, Technical How-to

Trong bài viết này, chúng tôi khám phá một giải pháp được tối ưu hóa bằng  AWS Lambda và Python để đọc và nhập dữ liệu CSV vào bảng Amazon DynamoDB hiện có. Phương pháp này tuân thủ các hạn chế bảo mật của tổ chức, hỗ trợ cơ sở hạ tầng dưới dạng mã (Infrastructure as Code – IaC) để quản lý bảng, và cung cấp quy trình điều khiển theo sự kiện để nhập các tập dữ liệu CSV vào DynamoDB.

Trong môi trường tuân thủ nghiêm ngặt, việc duy trì bản ghi kiểm toán chi tiết là bắt buộc để đáp ứng quy định. Giải pháp này giải quyết nhu cầu đó bằng cách tự động ghi lại cả các giao dịch thành công và các bản ghi thất bại, cung cấp tính minh bạch cần thiết cho việc xác thực và đối chiếu theo quy định. Bằng cách tạo ra các kết quả đầu ra riêng biệt cho các mục đã xử lý và chưa xử lý, hệ thống cung cấp bằng chứng cần thiết để đáp ứng yêu cầu của các kiểm toán viên trong các ngành dịch vụ tài chính, chăm sóc sức khỏe và các ngành công nghiệp được quản lý chặt chẽ khác, nơi tài liệu xử lý dữ liệu có thể được yêu cầu.

Các yêu cầu chính mà giải pháp này giải quyết bao gồm:

  • Nhập dữ liệu CSV vào DynamoDB theo chương trình sử dụng pipeline trích xuất, biến đổi và tải (ETL)
  • Liên tục thêm dữ liệu vào bảng DynamoDB hiện có
  • Mở rộng giải pháp cho môi trường tại chỗ (on-premises), không chỉ AWS
  • Sử dụng phương pháp tiếp cận theo sự kiện khi có dữ liệu mới sẵn sàng để nhập
  • Giảm bớt sự phụ thuộc vào AWS Management Console  hoặc các quy trình thủ công
  • Sử dụng các bản ghi kiểm toán cho ảnh chụp dữ liệu theo thời điểm (point-in-time snapshot) của dữ liệu đã được chuyển đổi

Giải pháp này lý tưởng cho các tập dữ liệu có quy mô nhỏ đến trung bình (1k-1M+ hàng mỗi tệp). Nếu yêu cầu của bạn bao gồm khả năng tiếp tục các tác vụ nhập bị gián đoạn, khả năng nhập các tập dữ liệu lớn và thời gian tính toán trên mỗi lần thực thi dài hơn 15 phút, hãy cân nhắc sử dụng AWS Glue ETL job thay vì AWS Lambda.

Tổng quan giải pháp

Quy trình nhập dữ liệu diễn ra như sau:

  1. Sử dụng AWS Command Line Interface (AWS CLI) hoặc lên lịch chuyển tệp để tải dữ liệu CSV lên Amazon Simple Storage Service (Amazon S3) bucket.
  2. Amazon S3 Event Notifications kích hoạt hàm Lambda.
  3. Hàm Lambda đọc tệp CSV từ bucket S3, thêm dữ liệu vào bảng DynamoDB hiện có và lưu trữ dữ liệu đã chuyển đổi thành đối tượng JSON trong bucket gốc.

Sơ đồ sau đây cho thấy quy trình nhập dữ liệu từ đầu đến cuối.

Để thiết lập giải pháp, sử dụng các bước tổng quan sau:

  1. Tạo IAM role AWS Identity and Access Management.
  2. Tạo hàm Lambda và tải lên mã Python.
  3. Tạo bucket S3 và thông báo sự kiện để kích hoạt hàm Lambda.
  4. Tạo bảng DynamoDB.
  5. Tạo tệp CSV mẫu.
  6. Tải lên bucket S3 để nhập tệp CSV vào bảng DynamoDB.
  7. Khám phá các mục trong bảng DynamoDB.

Trong ví dụ này, ví dụ này sử dụng kích thước tài nguyên nhỏ. Khi bạn mở rộng quy mô tập dữ liệu, hãy xem xét các điều sau:

  • Tăng bộ nhớ Lambda lên tối đa 10GB. Điều này cũng sẽ tăng cường phân bổ CPU và băng thông mạng.
  • Đặt thời gian chờ Lambda tối đa 15 phút (900 giây).
  • Tăng bộ nhớ tạm thời Lambda lên 10GB nếu xử lý các tệp CSV lớn tạm thời.

*Trong quá trình thử nghiệm, giải pháp này có thể nhập tệp CSV chứa 1 triệu hàng trong 9 phút với 5120MB bộ nhớ được cấu hình.

Nếu bạn yêu cầu xử lý các tệp lớn hơn trong thời gian ít hơn 15 phút, có các biến đổi phức tạp để thực hiện, hoặc cần tiếp tục các công việc bị gián đoạn, hãy sử dụng AWS Glue ETL job.

Điều kiện tiên quyết

Để triển khai giải pháp này, bạn có thể sử dụng máy tính hoặc AWS CloudShell. Các yêu cầu sau phải được cài đặt:

  • Tối thiểu AWS CLI 2.23.12. Để biết hướng dẫn, xem Getting started with the AWS CLI.
  • Tối thiểu Python 3.13.1.
  • Hướng dẫn này sử dụng cú pháp và lệnh của hệ điều hành Linux. Bạn sẽ cần chuyển đổi các lệnh sang PowerShell/Microsoft Windows.
  • Đặt AWS Region mặc định trong AWS CLI:

Bạn sẽ cần một IAM role với các quyền thích hợp để cấu hình Lambda, Amazon S3, IAM và DynamoDB.

Tạo IAM role

Để tạo thư mục dự án trên máy tính phát triển của bạn, chạy mã sau:

mkdir csv-to-ddb 

cd csv-to-ddb

Để tạo IAM role, thực hiện các bước sau:

  1. Thu thập và thiết lập các biến môi trường. Hãy chắc chắn thay thế tên bảng DynamoDB và tên hàm Lambda mong muốn của bạn:

ACCOUNT=$(aws sts get-caller-identity –query Account –output text)

aws configure set region us-east-1

export REGION=$(aws configure get region)

export DDB_TABLE=<your-dynamodb-table-name>

export LAMBDA_FUNCTION=<your-lambda-function>

export S3_BUCKET=<your-s3-bucket>

export IAM_ROLE=<iam-role-name>

  1. Tạo trust policy cho role thực thi Lambda:

cat << ‘EOF’ > trust-policy.json

{

“Version”: “2012-10-17”,

“Statement”: [

{

“Effect”: “Allow”,

“Principal”: {

“Service”: “lambda.amazonaws.com”

},

“Action”: “sts:AssumeRole”

}

]

}

EOF

  1. Tạo IAM role sử dụng trust policy:

aws iam create-role –role-name $IAM_ROLE –assume-role-policy-document file://trust-policy.json

  1. Chạy các lệnh sau để tạo và gắn chính sách đặc quyền tối thiểu vào role thực thi Lambda. Chính sách này được giới hạn phạm vi chỉ với các quyền cần thiết để đọc từ Amazon S3 và ghi vào DynamoDB.

cat <<EOF > csv-to-ddb-policy.json

{

“Version”: “2012-10-17”,

“Statement”: [

{

“Effect”: “Allow”,

“Action”: [

“dynamodb:BatchWriteItem”

],

“Resource”: “arn:aws:dynamodb:$REGION:$ACCOUNT:table/$DDB_TABLE”

},

{

“Effect”: “Allow”,

“Action”: [

“s3:GetObject”

],

“Resource”: “arn:aws:s3:::$S3_BUCKET/*”

},

{

“Effect”: “Allow”,

“Action”: [

“s3:PutObject”

],

“Resource”: [

“arn:aws:s3:::$S3_BUCKET/json-copy/*”,

“arn:aws:s3:::$S3_BUCKET/unprocessed/*”

]

}

]

}

EOF

aws iam create-policy \

–policy-name DynamoDBWriteS3ReadPolicy \

–policy-document file://csv-to-ddb-policy.json

aws iam attach-role-policy \

–role-name $IAM_ROLE \

–policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy

aws iam attach-role-policy \

–role-name $IAM_ROLE \

–policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

Tạo hàm Lambda và tải lên mã Python

Để tạo hàm Lambda và tải lên mã Python, thực hiện các bước sau:

  1. Định nghĩa các biến môi trường với Amazon Resource Name (ARN) của IAM role mới được tạo. Điều này sẽ được sử dụng khi tạo hàm Lambda của bạn.

export LAMBDA_EXECUTION_ROLE_ARN=$(aws iam get-role –role-name $IAM_ROLE –query ‘Role.Arn’ –output text)

  1. Sử dụng máy tính phát triển của bạn, tạo thư mục dự án và chạy đoạn mã sau để tạo script Python. Mã Python này sẽ được gọi bởi hàm Lambda được thiết kế để xử lý tệp CSV được tải lên S3 bucket, chuyển đổi nó thành danh sách các dictionary, ghi dữ liệu vào bảng DynamoDB, và sau đó tải biểu diễn JSON của dữ liệu trở lại S3 bucket. Hàm được kích hoạt bởi sự kiện S3 và hoạt động theo phương thức hướng sự kiện:

cat << ‘EOF’ > import.py

import logging

import boto3

import os

import csv

import json

import time

from io import StringIO

logger = logging.getLogger()

logger.setLevel(logging.DEBUG)

region = os.getenv(‘AWS_REGION’)

table_name = os.getenv(‘DYNAMO_DB_TABLE_NAME’)

dynamo_endpoint_url = f”https://dynamodb.{region}.amazonaws.com”

ddbClient = boto3.resource(‘dynamodb’, endpoint_url=dynamo_endpoint_url)

s3client = boto3.client(‘s3’)

def lambda_handler(event, context):

    logger.info(“Received Event: %s”, event)

    ddbTable = ddbClient.Table(table_name)

    # Get the object from the event

    bucket = event[‘Records’][0][‘s3’][‘bucket’][‘name’]

    key = event[‘Records’][0][‘s3’][‘object’][‘key’]

    csv_filename = os.path.basename(key)

    logger.info(“Bucket Name extracted from the event: %s”, bucket)

    logger.info(“Object Key extracted from the event: %s”, key)

    try:

        # Get the CSV object from S3

        csv_object = s3client.get_object(Bucket=bucket, Key=key)

        # Read and parse CSV data

        try:

            csv_data = csv_object[‘Body’].read().decode(‘utf-8-sig’)

            csv_reader = csv.DictReader(StringIO(csv_data))

            # Convert CSV to list of dictionaries

            rows = []

            for row in csv_reader:

                # Remove empty strings and clean up the data

                cleaned_row = {k: v for k, v in row.items() if v is not None and v != ”}

                if cleaned_row:  # Only append if the row has data

                    rows.append(cleaned_row)

            logger.info(f”Successfully parsed {len(rows)} rows from CSV”)

        except Exception as e:

            error_msg = f”Error parsing CSV: {str(e)}”

            logger.error(error_msg)

            return {

                “statusCode”: 400,

                “body”: json.dumps({“error”: error_msg})

            }

        # Write to DynamoDB with retry logic for unprocessed items

        batch_size = 25

        unprocessed_items = []

        for idx in range(0, len(rows), batch_size):

            batch = rows[idx:idx + batch_size]

            request_items = {table_name: [{‘PutRequest’: {‘Item’: item}} for item in batch]}

            retries = 0

            # Retry logic with exponential backoff

            while retries <= 3:

                resp = ddbClient.meta.client.batch_write_item(RequestItems=request_items)

                unp = resp.get(‘UnprocessedItems’, {}).get(table_name, [])

                if not unp:

                    break

                request_items = {table_name: unp}

                retries += 1

                time.sleep(2 ** retries)

                logger.warning(f”Retry {retries} for {len(unp)} unprocessed items”)

            # Handle any remaining unprocessed items

            if unp:

                items = [r[‘PutRequest’][‘Item’] for r in unp]

                save_unprocessed(bucket, csv_filename, items)

                unprocessed_items.extend(items)

        logger.info(“Data written to DynamoDB table successfully.”)

        # Create JSON object with array (excluding unprocessed items)

        processed_items = []

        unprocessed_item_set = {json.dumps(item, sort_keys=True) for item in unprocessed_items}

        for item in rows:

            if json.dumps(item, sort_keys=True) not in unprocessed_item_set:

                processed_items.append(item)

        json_object = {“data”: processed_items}

        json_data = json.dumps(json_object, indent=2)

        # Create the JSON key with just the filename

        json_key = f”json-copy/{csv_filename.replace(‘.csv’, ‘.json’)}”

        s3client.put_object(Body=json_data, Bucket=bucket, Key=json_key)

        logger.info(f”JSON data uploaded to {bucket}/{json_key}”)

        return {

            “statusCode”: 200,

            “body”: json.dumps({

                “processed_rows”: len(processed_items),

                “unprocessed_rows”: len(unprocessed_items),

                “unprocessed_file”: f”unprocessed/{csv_filename}.unprocessed.json” if unprocessed_items else None,

                “json_copy”: json_key

            })

        }

    except Exception as e:

        error_msg = f”Error processing file: {str(e)}”

        logger.error(error_msg)

        return {

            “statusCode”: 500,

            “body”: json.dumps({“error”: error_msg})

        }

def save_unprocessed(bucket, fname, items):

    “””Save items that couldn’t be written to DynamoDB”””

    key = f”unprocessed/{fname}.unprocessed.json”

    try:

        existing = json.loads(s3client.get_object(Bucket=bucket, Key=key)[‘Body’].read())

    except s3client.exceptions.NoSuchKey:

        existing = []

    except Exception as e:

        logger.warning(f”Error reading existing unprocessed items: {str(e)}”)

        existing = []

    existing.extend(items)

    s3client.put_object(Bucket=bucket, Key=key, Body=json.dumps(existing))

    logger.info(f”Saved {len(items)} unprocessed items to {key}”)

EOF

Hàm Lambda này triển khai pipeline ETL được kích hoạt bởi việc tải lên tệp CSV vào Amazon S3. Hàm trích xuất dữ liệu từ tệp CSV kích hoạt sử dụng client Amazon S3, và thực hiện biến đổi dữ liệu bằng cách phân tích nội dung CSV với lớp DictReader và làm sạch thông qua việc loại bỏ các giá trị rỗng và trường null. Sau đó nó thực thi thao tác tải song song khi lặp qua các hàng của tệp CSV và ghi chúng theo lô 25 mỗi lần vào bảng của bạn. Nếu bất kỳ thao tác ghi nào thất bại, chúng sẽ được thử lại tối đa 3 lần nữa sử dụng chiến lược backoff theo cấp số nhân trước khi cuối cùng được ghi vào S3 để ghi log tất cả các mục chưa được xử lý. Việc triển khai sử dụng các thao tác batch của AWS SDK cho ghi DynamoDB để tối ưu hóa thông lượng, kết hợp xử lý lỗi với logging, và duy trì mẫu kiến trúc serverless hoàn toàn bằng cách sử dụng các biến môi trường cho quản lý cấu hình. Mã này thể hiện các mẫu tích hợp serverless bằng cách nối chuỗi các dịch vụ AWS (Amazon S3 đến Lambda đến DynamoDB đến Amazon S3) và triển khai pipeline dữ liệu đích kép.

  1. Nén mã Python của bạn để có thể tạo hàm Lambda với nó:

zip import.zip import.py

  1. Tạo hàm của bạn sử dụng tệp zip chứa mã Python:

aws lambda create-function \

–function-name $LAMBDA_FUNCTION \

–runtime python3.13 \

–zip-file fileb://import.zip \

–handler import.lambda_handler \

–role $LAMBDA_EXECUTION_ROLE_ARN \

–timeout 180 \

–memory-size 2048 \

–tracing-config Mode=Active \

–architectures arm64

  1. Thêm các biến môi trường vào hàm Lambda của bạn:

aws lambda update-function-configuration \

–function-name $LAMBDA_FUNCTION \

–environment “Variables={DYNAMO_DB_TABLE_NAME=$DDB_TABLE}”

Tạo bucket S3 và thông báo sự kiện để kích hoạt hàm Lambda

Để tạo bucket S3 và thông báo sự kiện để kích hoạt hàm Lambda, thực hiện các bước sau:

  1. Tạo S3 bucket để nhập dữ liệu CSV của bạn:

aws s3 mb s3://$S3_BUCKET –region $REGION

  1. Thêm quyền cho hàm Lambda để được gọi bởi ARN của S3 bucket:

aws lambda add-permission \

–function-name $LAMBDA_FUNCTION \

–statement-id s3-lambda-permission \

–action “lambda:InvokeFunction” \

–principal s3.amazonaws.com \

–source-arn “arn:aws:s3:::$S3_BUCKET” \

–source-account $ACCOUNT

  1. Tạo cấu hình thông báo S3 bucket:

echo ‘{

“LambdaFunctionConfigurations”: [

{

“LambdaFunctionArn”: “arn:aws:lambda:’$REGION’:’$ACCOUNT’:function:’$LAMBDA_FUNCTION’”,

“Events”: [“s3:ObjectCreated:*”]

}

]

}’ > notify.json

aws s3api put-bucket-notification-configuration \

–bucket $S3_BUCKET \

–notification-configuration file://notify.json

Tạo bảng DynamoDB

Để tạo bảng DynamoDB, sử dụng máy tính phát triển của bạn với các điều kiện tiên quyết cần thiết đã được cài đặt để chạy lệnh sau:

aws dynamodb create-table \

–table-name $DDB_TABLE \

–attribute-definitions \

AttributeName=account,AttributeType=S \

AttributeName=offer_id,AttributeType=S \

–key-schema \

AttributeName=account,KeyType=HASH \

AttributeName=offer_id,KeyType=RANGE \

–billing-mode PAY_PER_REQUEST

Lệnh này sẽ hoạt động trên hệ điều hành Linux. Đối với Windows, thay thế dấu gạch chéo ngược bằng `. Lệnh này tạo một bảng on-demand với partition key có tên là account và sort key có tên là offer_id. Primary key trong DynamoDB có thể là đơn giản (chỉ có partition key) hoặc composite (partition key và sort key). Điều quan trọng là primary key phải duy nhất cho mỗi bản ghi, cũng như cung cấp hoạt động đồng đều trên tất cả các partition key trong bảng. Đối với trường hợp sử dụng của chúng ta, chúng ta sẽ sử dụng số tài khoản và ID offer, vì chúng có thể được kết hợp để cung cấp cho mỗi item một định danh duy nhất có thể được truy vấn.

Vì đây là trường hợp sử dụng batch, bảng on-demand sẽ cho phép bạn chỉ trả tiền cho các request mà bạn thực hiện trong quá trình xử lý. Các bảng on-demand tự động mở rộng để phù hợp với mức độ traffic mà bạn cần, vì vậy bạn không phải lo lắng về việc quản lý các chính sách scaling hoặc lập kế hoạch capacity. Theo mặc định, các bảng on-demand được cung cấp để hỗ trợ 4,000 Write Capacity Units (WCU) và 12,000 Read Capacity Units (RCU). Nếu bạn dự định vượt quá các giá trị này ngay từ đầu, bạn có thể khám phá việc sử dụng warm throughput như một giải pháp.

Tạo tệp CSV mẫu

Để tạo tệp  data.csv mẫu, thực hiện các bước sau:

  1. Sử dụng máy tính phát triển của bạn, tạo script Python sau:

cat << ‘EOF’ > make-csv.py

import csv

import random

import string

import datetime

account_type_codes = [”.join(random.choices(string.ascii_uppercase, k=2)) for _ in range(26)]

offer_type_ids = [”.join(random.choices(string.digits, k=8)) for _ in range(100)]

risk_levels = [‘high’, ‘medium’, ‘low’]

data_rows = []

for _ in range(1500):

    account = ”.join(random.choices(string.digits, k=8))

    offer_id = ”.join(random.choices(string.digits, k=12))

    catalog_id = ”.join(random.choices(string.digits, k=18))

    account_type_code = random.choice(account_type_codes)

    offer_type_id = random.choice(offer_type_ids)

    created = datetime.datetime.now(datetime.timezone.utc) – datetime.timedelta(days=random.randint(0, 365))

    expire = created + datetime.timedelta(days=random.randint(7, 365))

    risk = random.choice(risk_levels)

    data_row = [

        account,

        offer_id,

        catalog_id,

        account_type_code,

        offer_type_id,

        created.isoformat(),

        expire.isoformat(),

        risk

    ]

    data_rows.append(data_row)

with open(‘data.csv’, mode=’w’, newline=”) as file:

    writer = csv.writer(file)

    writer.writerow([

        ‘account’,

        ‘offer_id’,

        ‘catalog_id’,

        ‘account_type_code’,

        ‘offer_type_id’,

        ‘created’,

        ‘expire’,

        ‘risk’

    ])

    writer.writerows(data_rows)

print(“CSV file generated successfully.”)

EOF

  1. Chạy script Python sau:

python3 make-csv.py

Một tệp có tên  data.csv với 1,500 hàng sẽ được tạo trong thư mục dự án của bạn. Mã sau đây cho thấy một ví dụ:

account,offer_id,catalog_id,account_type_code,offer_type_id,created,expire,risk

43469653,626503640435,649413151048141733,GE,07052721,2024-07-22T16:23:06.771968+00:00,2025-04-15T16:23:06.771968+00:00,high

Tải lên S3 bucket để nhập tệp CSV vào bảng DynamoDB

Để tải lên S3 bucket để nhập tệp CSV vào bảng DynamoDB, chạy lệnh sau:

aws s3 cp data.csv s3://$S3_BUCKET/

Tệp được tải lên bucket của bạn, tạo ra thông báo sự kiện S3 kích hoạt hàm Lambda. Hàm Lambda Python xử lý các tệp CSV được tải lên Amazon S3, nhập dữ liệu vào DynamoDB để truy cập vận hành, và đồng thời lưu trữ phiên bản JSON trong Amazon S3 cho mục đích phân tích và kiểm toán. Khám phá các item trong bảng

Để khám phá các item trong bảng, sử dụng hai lệnh CLI này. Chúng sẽ trả về số lượng tất cả các item được chèn cũng như 10 item đầu tiên từ bảng của bạn:

aws dynamodb scan \

–table-name $DDB_TABLE \

–select COUNT

aws dynamodb scan \

–table-name $DDB_TABLE \

–limit 10

Các ảnh chụp màn hình sau đây cho thấy một ví dụ về các truy vấn bảng.

Bạn vừa xây dựng một pipeline ETL serverless tự động nhập dữ liệu CSV vào DynamoDB. Giải pháp sử dụng Lambda và trigger sự kiện Amazon S3 để tạo ra quy trình nhập dữ liệu không cần bảo trì.

Giám sát và Tối ưu hóa

Sau khi bạn đã cung cấp tất cả các tài nguyên, điều quan trọng là phải giám sát cả hiệu suất bảng DynamoDB cũng như ETL Lambda của bạn. Sử dụng CloudWatch Metrics để giám sát các metric cấp bảng như RCU và WCU đã tiêu thụ, Độ trễ và các request bị throttle, trong khi sử dụng CloudWatch Logs để debug function của bạn thông qua error logging được lưu trữ. Điều này sẽ cung cấp thông tin chi tiết về hiệu suất của bảng DynamoDB cũng như hàm Lambda khi quá trình ETL đang chạy.

Khi kích thước tệp đầu vào của bạn tăng lên, điều quan trọng là cũng phải mở rộng memory và ephemeral storage được phân bổ cho hàm Lambda của bạn tương ứng. Điều này đảm bảo rằng hàm Lambda sẽ chạy nhất quán và hiệu quả. Khi sizing hàm Lambda của bạn, bạn có thể sử dụng công cụ AWS Lambda Power Tuning để thử nghiệm các cấu hình memory khác nhau để tối ưu hóa cả hiệu suất và chi phí.

Dọn dẹp

Khi bạn đã hoàn thành, hãy dọn dẹp các tài nguyên liên quan đến việc triển khai ví dụ để tránh phát sinh các khoản phí không mong muốn:

aws lambda delete-function –function-name $LAMBDA_FUNCTION

aws s3 rm “s3://$S3_BUCKET” –recursive

aws s3api delete-bucket –bucket “$S3_BUCKET”

aws iam detach-role-policy \

–role-name $IAM_ROLE \

–policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy

aws iam detach-role-policy \

–role-name $IAM_ROLE \

–policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

aws iam delete-role –role-name $IAM_ROLE

aws iam delete-policy –policy-arn arn:aws:iam::$ACCOUNT:policy/DynamoDBWriteS3ReadPolicy

aws dynamodb delete-table –table-name $DDB_TABLE

cd ..

rm -r csv-to-ddb 

Kết luận

Trong bài viết này, chúng tôi đã trình bày một giải pháp kết hợp sức mạnh của Python để thao tác dữ liệu và Lambda để tương tác với DynamoDB, cho phép nhập dữ liệu từ các nguồn CSV vào bảng DynamoDB. Chúng tôi cũng đã trình bày cách nhập và lưu trữ dữ liệu có cấu trúc trong DynamoDB một cách lập trình.

Bạn có thể sử dụng giải pháp này để tích hợp các pipeline dữ liệu từ on-premises vào môi trường AWS cho mục đích migration hoặc continuous delivery.

Về tác giả

Bill Pfeiffer là Solutions Architect cấp cao tại AWS, tập trung vào việc giúp đỡ khách hàng thiết kế, triển khai và phát triển cơ sở hạ tầng cloud an toàn và tối ưu chi phí. Anh ấy đam mê giải quyết các thách thức kinh doanh bằng các giải pháp kỹ thuật. Ngoài công việc, Bill thích đi du lịch khắp Hoa Kỳ cùng gia đình bằng xe RV và khám phá thiên nhiên.

Mike Wells là Solutions Architect tại AWS, làm việc với các khách hàng trong lĩnh vực dịch vụ tài chính. Anh ấy giúp các tổ chức thiết kế các giải pháp cloud có khả năng phục hồi, bảo mật đáp ứng các yêu cầu quy định nghiêm ngặt trong khi tối đa hóa lợi ích từ AWS. Khi không thiết kế các giải pháp cloud, Mike là một người đam mê chạy bộ và thích dành thời gian với gia đình và bạn bè..