Sắp xếp các file phụ thuộc và tải lên bằng AWS Step Functions

Bài đăng này được viết bởi Nelson Assis, Enterprise Support Lead, Serverless và Jevon Liburd, Technical Account Manager, Serverless

Amazon S3 là dịch vụ lưu trữ đối tượng được nhiều khách hàng sử dụng để lưu trữ tệp. Với việc sử dụng Amazon S3 Event Notifications hoặc Amazon EventBridge, khách hàng có thể tạo workloads bằng event-driven architecture (EDA). Kiến trúc này phản hồi các sự kiện được tạo ra khi có thay đổi xảy ra với các đối tượng trong S3 bucket.

EDA liên quan đến giao tiếp không đồng bộ giữa các thành phần hệ thống. Điều này nhằm mục đích tách rời các thành phần cho phép mỗi thành phần hoạt động độc lập.

Một số kịch bản có thể đưa ra sự ghép nối trong kiến ​​trúc do sự phụ thuộc giữa các sự kiện. Bài đăng trên blog này trình bày một ví dụ phổ biến về việc ghép nối và cách xử lý nó bằng cách sử dụng AWS Step Functions.

Tổng quan

Trong ví dụ này, một tổ chức có hai team làm việc độc lập, Sales team và Warehouse team. Mỗi team chịu trách nhiệm các tải tệp dữ liệu hàng tháng lên S3 bucket để có thể xử lý.

Các tệp sẽ tạo ra các sự kiện khi chúng được tải lên, bắt đầu các quá trình tiếp theo. Quá trình xử lý Warehouse file sẽ làm sạch dữ liệu và kết hợp dữ liệu đó với dữ liệu từ Shipping team. Việc xử lý Sales file sẽ tương quan dữ liệu với dữ liệu được kết hợp từ Warehouse và Shipping. Điều này cho phép các nhà phân tích thực hiện dự báo và thu thập thông tin khác.

Để mối tương quan này xảy ra, Warehouse file phải được xử lý trước Sales file. Vì hai teams làm việc độc lập nên không có sự phối hợp giữa các teams. Điều này có nghĩa là các tệp có thể được tải lên bất kỳ lúc nào mà không có sự đảm bảo rằng Warehouse file được xử lý trước Sales file.

Đối với các tình huống như thế này, có thể sử dụng Aggregator pattern. Mẫu thu thập và lưu trữ các event, đồng thời kích hoạt một event mới dựa trên các combined events. Trong trường hợp được mô tả, các combined events là Warehouse file đã xử lý và tệp Sales file được tải lên.

Các yêu cầu của mẫu tổng hợp là:

  1. Correlation: Một cách để nhóm các sự kiện liên quan. Điều này được thực hiện bằng một mã định danh duy nhất trong tên file.
  2. Event aggregator: Một kho lưu trữ trạng thái cho các sự kiện.
  3. Completion check and trigger: Một điều kiện khi các combined events đã được nhận và cách để xuất bản resulting event.

Tổng quan về kiến ​​trúc

Kiến trúc này sẽ sử dụng các dịch vụ AWS sau:

Image link

  1. File upload: Sales team và Warehouse team tải các tệp tương ứng của họ lên S3.
  2. EventBridge: ObjectCreated event được gửi đến EventBridge nơi có rules với target của main workflow.
  3. Main state machine: State machine này điều phối các hoạt động tổng hợp và xử lý file. Nó đóng gói các workflow cho mỗi file để tách logic tổng hợp khỏi file workflow logic.
  4. File parser and correlation: Business logic để xác định file và loại file được chạy trong hàm Lambda này.
  5. Stateful store: DynamoDB table lưu trữ thông tin về file như tên, loại và trạng thái xử lý. State machine đọc và ghi vào DynamoDB table. Task tokens cũng được lưu trữ trong bảng này.
  6. File processing: Tùy thuộc vào loại tệp và mọi điều kiện trước, các state machine tương ứng với loại file sẽ được chạy. Các state machine này chứa logic để xử lý file cụ thể.
  7. Task Token & Callback: Task token tác vụ được tạo khi file phụ thuộc cố gắng được xử lý trước file độc lập. Mẫu Step Functions “Wait for a Callback” tiếp tục thực thi file phụ thuộc sau khi file độc lập được xử lý.

Hướng dẫn

Bạn cần có những điều kiện tiên quyết sau:

  • Đã cài đặt AWS CLIAWS SAM CLI.
  • Tài khoản AWS.
  • Có đủ quyền để quản lý tài nguyên AWS.
  • Đã cài đặt Git.

Để triển khai ví dụ, hãy làm theo hướng dẫn trong kho lưu trữ GitHub.

Hướng dẫn này cho biết điều gì sẽ xảy ra nếu file phụ thuộc (Sales file) được tải lên trước file độc lập (Warehouse file).

  1. Workflow bắt đầu bằng việc tải tệp Sales file lên Sales S3 bucket chuyên dụng. Trong ví dụ này sẽ sử dụng các nhóm S3 riêng biệt cho hai tệp vì nó giả định rằng Sales team và Warehouse team được phân phối và độc lập. Bạn có thể tìm thấy các file mẫu trong code repository này.
  1. Việc tải file lên S3 sẽ gửi một sự kiện tới EventBridge, sự kiện mà aggregator state machine sẽ thực hiện. Mẫu sự kiện được sử dụng trong quy tắc EventBridge là:

“`JSON

{

  “detail-type”: [“Object Created”],

  “source”: [“aws.s3”],

  “detail”: {

    “bucket”: {

      “name”: [“sales-mfu-eda-09092023”, “warehouse-mfu-eda-09092023”]

    },

    “reason”: [“PutObject”]

  }

}

“`

  1. Aggregator state machine bắt đầu bằng cách gọi hàm Lambda phân tích file. Hàm này phân tích loại file và sử dụng mã định danh để liên kết giữa các file. Trong ví dụ này, tên file chứa loại file (file type) và mã định danh tương quan (correlation identifier) như (year_month). Sử dụng các cách khác để biểu thị file type và correlation identifier, bạn có thể sửa đổi hàm này để phân tích thông tin đó.
  1. Bước tiếp theo trong state machine sẽ chèn bản ghi cho sự kiện vào bảng DynamoDB tổng hợp sự kiện. Bảng có một khóa chính kết hợp với correlation identifier thành khóa phân vùng (partition key) và loại file sẽ thành khóa sắp xếp (sort key). Trạng thái xử lý của file được theo dõi để đưa ra phản hồi về trạng thái của workflow.
  1. Dựa trên file type, state machine sẽ xác định nhánh nào sẽ theo sau. Trong ví dụ này, nhánh Sales được chạy. State machine cố gắng lấy trạng thái của file Warehouse (phụ thuộc) từ DynamoDB bằng cách sử dụng correlation identifier. Kết quả của truy vấn sẽ được state machine  xác định xem Warehouse file tương ứng đã được xử lý chưa.
  1. Vì Warehouse file chưa được xử lý nên mẫu tích hợp waitForTaskToken sẽ được sử dụng. State machine đợi ở bước này và tạo task token mà các dịch vụ bên ngoài sử dụng để kích hoạt state machine tiếp tục thực thi. Sales record trong bảng DynamoDB được cập nhật bằng Task Token.

Image link

  1. Điều hướng đến bảng điều khiển S3 và tải Warehouse file mẫu lên bộ chứa Warehouse S3. Thao tác này gọi một phiên bản mới của quy trình làm việc Step Functions, luồng công việc này sẽ chảy qua nhánh khác sau bước chọn loại file. Trong nhánh này,Warehouse state machine đang chạy và trạng thái xử lý của tệp được cập nhật trong DynamoDB.

Image link

Khi trạng thái của Warehouse file được thay đổi thành “Completed”, Warehouse state machine sẽ kiểm tra DynamoDB để tìm Sales file đang chờ xử lý. Nếu có, nó sẽ truy xuất task token và gọi phương thức SendTaskSuccess. Điều này kích hoạt Sales state machine đang ở trạng thái chờ để tiếp tục. Sales state machine được khởi động và trạng thái xử lý được cập nhật.

Phần kết luận

Bài đăng trên blog này cho thấy cách xử lý các phần phụ thuộc của file trong kiến ​​trúc hướng sự kiện (event driven architectures). Bạn có thể tùy chỉnh mẫu được cung cấp trong kho mã cho trường hợp sử dụng của riêng bạn.

Giải pháp này dành riêng cho các phần phụ thuộc file trong kiến ​​trúc hướng sự kiện (event driven architectures). Để biết thêm thông tin về cách giải quyết các mối quan hệ phụ thuộc và trình tổng hợp sự kiện, hãy đọc bài đăng trên blog: Moving to event-driven architectures with serverless event aggregators.

Để tìm hiểu thêm về kiến ​​trúc hướng sự kiện, hãy truy cập event driven architecture section on Serverless Land.

TAGS: contributed, serverless

Link bài viết gốc: link

Leave a comment