Điều phối tải lên tệp phụ thuộc bằng AWS Step Functions

by Benjamin Smith | on 02 NOV 2023 | in AWS Step Functions, Serverless | Permalink |  Share

Bài đăng này được viết bởi Nelson Assis, Trưởng nhóm Hỗ trợ Doanh nghiệp, Serverless và Jevon Liburd, Quản lý khách hàng kỹ thuật, Serverless

Amazon S3 là dịch vụ lưu trữ đối tượng được nhiều khách hàng sử dụng để lưu trữ tệp. Với việc sử dụng Thông báo sự kiện của Amazon S3 hoặc Amazon EventBridge, khách hàng có thể tạo khối lượng công việc với event-driven architecture kiến trúc theo sự kiện (EDA). Kiến trúc này phản hồi các sự kiện được tạo ra khi có thay đổi đối với các đối tượng trong vùng lưu trữ S3 buckets.

EDA liên quan đến giao tiếp không đồng bộ giữa các thành phần hệ thống. Điều này phục vụ để tách rời các thành phần cho phép mỗi thành phần được tự chủ.

Một số kịch bản có thể giới thiệu khớp nối (coupling) trong kiến trúc do sự phụ thuộc giữa các sự kiện. Bài đăng trên blog này trình bày một ví dụ phổ biến về khớp nối này và cách xử lý nó bằng AWS Step Functions.

Tổng quan

Trong ví dụ này, một tổ chức có hai nhóm  phân quyền tự động tự trị phân tán, nhóm Bán hàng (Sales) và nhóm Kho (Warehouse) . Mỗi nhóm chịu trách nhiệm tải tệp dữ liệu hàng tháng lên vùng lưu trữ S3 bucket để có thể xử lý.

Các tệp tạo ra các sự kiện khi chúng được tải lên, bắt đầu các quy trình downstream xuôi dòng. Việc xử lý tệp Warehouse  Kho sẽ làm sạch dữ liệu và kết hợp với dữ liệu từ nhóm Vận chuyển (Shipping). Việc xử lý tệp Sales Doanh số sẽ tương quan dữ liệu với dữ liệu kết hợp của Warehouse  Kho và ShippingVận chuyển kết hợp. Điều này cho phép các nhà phân tích thực hiện dự báo và thu thập các phân tích khác.

Để mối tương quan này xảy ra, tệp Warehouse Kho phải được xử lý trước tệp SalesDoanh số. Do hai nhóm hoạt động độc lập nên không có sự phối hợp giữa các nhóm. Điều này có nghĩa là các tệp có thể được tải lên bất kỳ lúc nào mà không có gì đảm bảo rằng tệp Warehouse Kho được xử lý trước tệp SalesBán hàng.

Đối với các tình huống như thế này,  Aggregator pattern  mẫu Bộ tổng hợp có thể được sử dụng. Mẫu thu thập và lưu trữ các sự kiện, đồng thời kích hoạt sự kiện mới dựa trên các sự kiện kết hợp. Trong kịch bản được mô tả, các sự kiện kết hợp là tệp WarehouseKho được xử lý và tệp Sales Doanh số đã được tải lên.

Các yêu cầu của mô hình Aggregator  là:

  1. Tương quan (Correlation) – Một cách để nhóm các sự kiện liên quan. Điều này được thực hiện bởi một mã định danh duy nhất trong tên tệp.
  2. Trình Công cụ tổng hợp sự kiện (Event Aggregator) – Một kho lưu trữ cửa hàng trạng thái cho các sự kiện
  3.  Kiểm tra hoàn thành và kích hoạt (Completion check and trigger) – Điều kiện khi các sự kiện kết hợp đã được nhận và cách để public kết quả của sự kiện. Một điều kiện khi nhận các sự kiện kết hợp đã được nhận và một cách để publish xuất bản sự kiện kết quả.

Tổng quan về kiến trúc

Kiến trúc sử dụng các dịch vụ AWS sau:

  1. Tải tệp lên: Nhóm Sales Bán hàng và Warehouse Kho tải tệp tương ứng của họ lên S3.
  2. EventBridge: Sự  kiện ObjectCreated được gửi đến EventBridge nơi có một quy tắc với mục tiêu của quy trình làm việc chính.
  3. Máy trạng thái chính (Main state machine): Máy trạng thái này điều phối các hoạt động tổng hợp và xử lý các tệp. Nó đóng gói các quy trình công việc cho mỗi tệp để tách logic tổng hợp khỏi logic quy trình làm việc của tệp.
  4. Trình phân tích cú pháp và tương quan tệp (File parser and correlation): Logic nghiệp vụ để xác định tệp và loại tệp được chạy trong hàm Lambda này.
  5. Lưu trữ trạng thái (Stateful store): Bảng DynamoDB lưu trữ thông tin về tệp như tên, loại và trạng thái xử lý. State machine đọc và ghi vào bảng DynamoDB. Mã thông báo nhiệm vụ cũng được lưu trữ trong bảng này.
  6. Xử lý tệp: Tùy thuộc vào loại tệp và bất kỳ điều kiện tiên quyết nào, các state machine  tương ứng với loại tệp sẽ được chạy. Các máy trạng thái này chứa logic để xử lý tệp cụ thể.
  7. Task Token &; Callback: Token  tác vụ được tạo khi tệp phụ thuộc cố gắng được xử lý trước tệp độc lập cố xử lý tệp phụ thuộc trước tệp không phụ thuộc. Mô hình Step Functions “Wait for a Callback” tiếp tục thực thi tệp phụ thuộc sau khi tệp độc lập không phụ thuộc được xử lý.

Hướng dẫn

Bạn cần các điều kiện tiên quyết sau:

  • Đã cài đặt AWS CLI AWS SAM CLI
  • Tài khoản AWS.
  • Đủ quyền để quản lý tài nguyên AWS.
  • Git đã cài đặt.

Để triển khai ví dụ, hãy làm theo hướng dẫn trong repo GitHub.

Hướng dẫn này cho thấy điều gì sẽ xảy ra nếu tệp phụ thuộc (tệp Sales ) được tải lên trước tệp độc lập (Tệp Warehouse ).

  1. Quy trình làm việc bắt đầu bằng việc tải tệp Sales lên vùng lưu trữ Sales S3 bucket chuyên dụng cho Sale. Ví dụ này sử dụng các vùng lưu trữ S3 buckets riêng biệt cho hai tệp vì ví dụ này giả định rằng các nhóm Sales Bán hàng và Warehouse Kho hoạt động phân tán và độc lập được phân phối và tự chủ. Bạn có thể tìm thấy  các tệp mẫu trong kho lưu trữ mã.
  1. Việc tải tệp lên S3 sẽ gửi một sự kiện đến EventBridge mà máy trạng thái tổng hợp hoạt động (aggregator state machine). Mẫu sự kiện được sử dụng trong quy tắc EventBridge là:
{  “detail-type”: [“Object Created”],  “source”: [“aws.s3”],  “detail”: {    “bucket”: {      “name”: [“sales-mfu-eda-09092023”, “warehouse-mfu-eda-09092023”]    },    “reason”: [“PutObject”]  }}
  1. Máy trạng thái tổng hợp bắt đầu bằng cách gọi hàm phân tích cú pháp tệp Lambda. Hàm này phân tích cú pháp loại tệp và sử dụng mã định danh để tương quan các tệp. Trong ví dụ này, tên của tệp chứa loại tệp và mã định danh tương quan (year_month). Để sử dụng các cách khác nhau để biểu diễn loại tệp và mã định danh tương quan, bạn có thể sửa đổi hàm này để phân tích cú pháp thông tin đó.
  1. Bước tiếp theo trong máy trạng thái sẽ chèn bản ghi cho sự kiện trong bảng tổng hợp sự kiện DynamoDB. Bảng có khóa chính tổng hợp với mã định danh tương quan là khóa phân vùng và loại tệp làm khóa sắp xếp. Trạng thái xử lý của tệp được theo dõi để đưa ra phản hồi về trạng thái của quy trình làm việc.
  1. Dựa trên loại tệp, máy trạng thái xác định nhánh nào sẽ theo dõi. Trong ví dụ, nhánh Sales Bán hàng đang chạy. Máy trạng thái cố gắng lấy trạng thái của tệp Warehouse Store Kho (phụ thuộc) từ DynamoDB bằng mã định danh tương quan. Sử dụng kết quả của truy vấn này, máy trạng thái xác định xem tệp Warehouse tương ứng đã được xử lý chưa.
  2. Vì tệp Warehouse chưa được xử lý,  mẫu tích hợp waitForTaskToken được sử dụng. Máy trạng thái đợi ở bước này và tạo token  tác vụ mà các dịch vụ bên ngoài sử dụng để kích hoạt máy trạng thái tiếp tục thực thi. Bản ghi Sales Doanh số trong bảng DynamoDB được cập nhật bằng Task Token Mã tác vụ.
  1. Điều hướng đến bảng điều khiển S3 và tải tệp Warehouse mẫu lên vùng lưu trữ Warehouse S3 bucket. Điều này kích hoạt gọi một phiên bản mới của quy trình công việc Step Functions, chạy  qua nhánh khác sau bước chọn loại tệp. Trong nhánh này, máy trạng thái Warehouse kho được chạy và trạng thái xử lý của tệp được cập nhật trong DynamoDB.

Khi trạng thái của tệp Kho (Warehouse) được thay đổi thành “Completed“, máy trạng thái Warehouse kho sẽ kiểm tra DynamoDB để tìm tệp Doanh số đang chờ xử lý. Nếu có, nó sẽ truy xuất token  tác vụ và gọi phương thức SendTaskSuccess. Điều này kích hoạt máy trạng thái SalesBán hàng, đang ở trạng thái chờ để tiếp tục. Máy trạng thái Bán hàng được khởi động và trạng thái xử lý được cập nhật.

Kết thúc

Bài đăng trên blog này cho thấy cách xử lý các phụ thuộc tệp trong kiến trúc hướng sự kiện. Bạn có thể tùy chỉnh mẫu được cung cấp trong kho lưu trữ mã cho trường hợp sử dụng của riêng bạn.

Giải pháp này dành riêng cho các phụ thuộc tệp trong kiến trúc hướng sự kiện. Để biết thêm thông tin về cách giải quyết các thành phần phụ thuộc và trình tổng hợp sự kiện, hãy đọc bài đăng trên blog: Chuyển sang kiến trúc theo sự kiện với trình tổng hợp sự kiện phi máy chủ.

Để tìm hiểu thêm về kiến trúc theo sự kiện, hãy truy cập phần kiến trúc theo sự kiện trên Serverless Land.

Tham khảo