AWS Big Data Blog

Giới thiệu khả năng hỗ trợ của Amazon MWAA cho Apache Airflow phiên bản 2.8.1

by Mansi Bhutada and Hernan Garcia | on 28 FEB 2024 | in Amazon Managed Workflows for Apache Airflow (Amazon MWAA), Announcements | Permalink |  Comments |  Share

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) là dịch vụ tự động hóa quản lý cho Apache Airflow, giúp thiết lập và vận hành các đường ống (pipeline) xử lý dữ liệu từ đầu đến cuối trên đám mây (cloud) một cách đơn giản. 

Các tổ chức sử dụng Amazon MWAA để cải thiện quy trình kinh doanh của họ. Ví dụ: C2i Genomics sử dụng Amazon MWAA trong nền tảng dữ liệu (data platform) của họ để điều phối việc kiểm định các thuật toán xử lý dữ liệu di truyền học ung thư trong hàng tỷ bản ghi. Twitch, một nền tảng phát sóng trực tiếp, quản lý và điều phối việc đào tạo cũng như triển khai các mô hình đề xuất (recommendation models) cho hơn 140 triệu người dùng đang hoạt động. Họ sử dụng Amazon MWAA để mở rộng quy mô, đồng thời cải thiện đáng kể tính bảo mật và giảm chi phí quản lý cơ sở hạ tầng.

Hôm nay, chúng tôi công bố môi trường Apache Airflow phiên bản 2.8.1 đã có sẵn trên Amazon MWAA. Trong bài đăng này, chúng tôi sẽ hướng dẫn bạn một số tính năng và khả năng mới của Airflow hiện có trên Amazon MWAA cũng như cách bạn có thể thiết lập hoặc nâng cấp môi trường Amazon MWAA của mình lên phiên bản 2.8.1.

Object storage – Lưu trữ đối tượng

Khi quy mô đường ống dữ liệu – data pipelinepipepine (quy trình/chuỗi xử lý dữ liệu) mở rộng, các kỹ sư gặp khó khăn trong việc quản lý lưu trữ trên nhiều hệ thống bằng APIs, phương thức xác thực và quy ước truy cập dữ liệu riêng biệt, đòi hỏi logic tùy chỉnh và các toán tử cụ thể cho việc lưu trữ. Airflow giờ đây cung cấp một lớp trừu tượng lưu trữ đối tượng thống nhất giúp xử lý những việc này, cho phép các kỹ sư tập trung vào data pipeline của họ. Lưu trữ đối tượng trong Airflow sử dụng fsspec để cho phép mã truy cập dữ liệu nhất quán trên các hệ thống lưu trữ đối tượng khác nhau, từ đó đơn giản hóa sự phức tạp của cơ sở hạ tầng.

Sau đây là một số lợi ích chính của tính năng này:

  • Portable workflows –  Bạn có thể chuyển đổi dịch vụ lưu trữ với những thay đổi tối thiểu trong Directed Acyclic Graphs (DAGs) của bạn
  • Truyền dữ liệu hiệu quả – Bạn có thể truyền dữ liệu thay vì tải vào bộ nhớ
  • Giảm bảo trì – Bạn không cần các toán tử riêng biệt, làm cho data pipeline của bạn dễ dàng bảo trì
  • Trải nghiệm lập trình quen thuộc – Bạn có thể sử dụng các mô-đun Python, như shutil, để thao tác với tệp

Để sử dụng lưu trữ đối tượng với Amazon Simple Storage Service (Amazon S3), bạn cần cài đặt thêm gói s3fs với nhà cung cấp Amazon (apache-airflow-providers-amazon[s3fs]==x.x.x).

Trong code mẫu bên dưới, bạn có thể xem cách chuyển dữ liệu trực tiếp từ Google Cloud Storage sang Amazon S3. Vì bộ lưu trữ đối tượng của Airflow sử dụng shutil.copyfileobj nên dữ liệu đối tượng được đọc theo từng đoạn (chunks) từ gcs_data_source và được truyền trực tuyến đến amazon_s3_data_target.

gcs_data_source = ObjectStoragePath(“gcs://source-bucket/prefix/”, conn_id=”google_cloud_default”)
amazon_s3_data_target = ObjectStoragePath(“s3://target-bucket/prefix/”, conn_id=”aws_default “)
with DAG(    dag_id=”copy_from_gcs_to_amazon_s3″,    start_date=datetime(2024, 2, 26),    schedule=”0 0 * * *”,    catchup=False,        tags=[“2.8”, “ObjectStorage”],) as dag:
    def list_objects(path: ObjectStoragePath) -> list[ObjectStoragePath]:        objects = [f for f in path.iterdir() if f.is_file()]        return objects
    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):            object.copy(dst=path)
    objects_list = list_objects(path=gcs_data_source)    copy_object.partial(path=amazon_s3_data_target).expand(object=objects_list)

Để biết thêm thông tin về lưu trữ đối tượng trong Airflow, tham khảo thêm Object Storage.

XCom UI

XCom (giao tiếp chéo) cho phép truyền dữ liệu giữa các tác vụ (tasks), tạo điều kiện thuận lợi cho việc liên lạc và phối hợp giữa các tasks. Trước đây, các nhà phát triển phải chuyển sang chế độ xem khác để xem các XCom liên quan đến một task. Với Airflow 2.8, các khóa-giá trị ( key-values) XCom được hiển thị trực tiếp trên một tab trong chế độ xem Airflow Grid, như được hiển thị trong ảnh chụp màn hình sau.

Tab XCom mới mang lại các lợi ích sau:

  • Cải thiện khả năng hiển thị XCom – Một tab riêng trong giao diện người dùng cung cấp một cách thuận tiện và thân thiện để xem tất cả XCom liên kết với một DAG hoặc task cụ thể.
  • Cải thiện khả năng gỡ rối – Việc có thể thấy giá trị XCom trực tiếp trong giao diện người dùng rất hữu ích cho việc gỡ lỗi các DAG. Bạn có thể nhanh chóng xem đầu ra của các task ngược dòng mà không cần phải kéo và kiểm tra chúng theo cách thủ công bằng mã Python.

Task context logger – Nhật ký bối cảnh tác vụ

Quản lý vòng đời tác vụ (task lifecycles) là rất quan trọng để data pipeline trong Airflow vận hành trơn tru. Tuy nhiên, vẫn tồn tại một số thách thức nhất định, đặc biệt là trong các tình huống mà tasks bị dừng đột ngột. Điều này có thể xảy ra do nhiều lý do, như hết thời gian chờ của bộ lập lịch (scheduler timeouts), tác vụ  “zombie” (nhiệm vụ vẫn trong trạng thái chạy mà không gửi tín hiệu) hay trường hợp máy chủ hết bộ nhớ.

Thường thì với những lỗi như vậy, đặc biệt là những lỗi do các thành phần Airflow cốt lõi như bộ lập lịch (scheduler) hoặc bộ thực thi (executor) gây ra, không được ghi lại trong nhật ký tác vụ (task logs). Hạn chế này yêu cầu người dùng phải khắc phục sự cố bên ngoài Airflow UI, làm quá trình xác định và giải quyết vấn đề trở nên phức tạp.

Airflow 2.8 đã giới thiệu một cải tiến quan trọng nhằm giải quyết vấn đề này. Các thành phần của Airflow, bao gồm bộ lập lịch (scheduler) và bộ thực thi (executor), giờ đây có thể sử dụng TaskContextLogger mới để chuyển tiếp thông báo lỗi trực tiếp đến nhật ký tác vụ (task logs). Tính năng này cho phép bạn xem tất cả các thông báo lỗi liên quan đến việc chạy một tác vụ trong một nơi. Điều này giúp đơn giản hóa quá trình tìm hiểu nguyên nhân tác vụ thất bại, cung cấp một cái nhìn đầy đủ về những gì đã xảy ra trong một giao diện nhật ký duy nhất.

Ảnh chụp màn hình sau đây cho thấy cách tác vụ được phát hiện là zombie và nhật ký bộ lập lịch được đưa vào như một phần của nhật ký tác vụ.

The following screenshot shows how the task is detected as zombie, and the scheduler log is being included as part of the task log.

Bạn cần đặt tham số cấu hình môi trường enable_task_context_logger thành True để bật tính năng này. Sau khi được bật, Airflow có thể gửi nhật ký từ trình lập lịch (scheduler), trình thực thi (executor) hoặc context  chạy lệnh callback  đến nhật ký tác vụ và trực quan với Airflow UI.

Listener hooks for datasets

Datasets were introduced in Airflow 2.4 as a logical grouping of data sources to create data-aware scheduling and dependencies between DAGs. For example, you can schedule a consumer DAG to run when a producer DAG updates a dataset. Listeners enable Airflow users to create subscriptions to certain events happening in the environment. In Airflow 2.8, listeners are added for two datasets events: on_dataset_created and on_dataset_changed, effectively allowing Airflow users to write custom code to react to dataset management operations. For example, you can trigger an external system, or send a notification.

Datasets là một khái niệm được giới thiệu trong Airflow 2.4 như một nhóm logic các nguồn dữ liệu để tạo lập kế hoạch nhận biết dữ liệu và các mối phụ thuộc giữa các DAG.. Ví dụ: bạn có thể lên lịch cho một DAG consumer chạy khi một DAG producer cập nhật dataset. Listeners cho phép người dùng Airflow tạo đăng ký cho các sự kiện nhất định xảy ra trong môi trường. Trong Airflow 2.8, các listener đã được thêm vào cho hai sự kiện dataset:  on_dataset_createdon_dataset_changed, cho phép người dùng Airflow viết code tùy chỉnh để phản ứng với các hoạt động quản lý dataset một cách hiệu quả. Ví dụ: bạn có thể kích hoạt hệ thống bên ngoài hoặc gửi thông báo.

Việc sử dụng listener hooks cho datasets rất đơn giản. Hoàn thành các bước sau để tạo listener cho on_dataset_changed:

  1. Tạo listener (dataset_listener.py):
from airflow import Datasetfrom airflow.listeners import hookimpl
@hookimpldef on_dataset_changed(dataset: Dataset):    “””Following custom code is executed when a dataset is changed.”””    print(“Invoking external endpoint”)
    “””Validating a specific dataset”””    if dataset.uri == “s3://bucket-prefix/object-key.ext”:        print (“Execute specific/different action for this dataset”)
  1. Tạo một plugin để đăng ký listener trong môi trường Airflow của bạn (dataset_listener_plugin.py):
from airflow.plugins_manager import AirflowPluginfrom plugins import listener_code
class DatasetListenerPlugin(AirflowPlugin):    name = “dataset_listener_plugin”    listeners = [dataset_listener]

Để biết thêm thông tin về cách cài đặt plugin trong Amazon MWAA, hãy tham khảo Installing custom plugins.

Thiết lập môi trường Airflow 2.8.1 mới trong Amazon MWAA

Bạn có thể bắt đầu thiết lập trong tài khoản và Khu vực (Region) ưa thích của mình bằng cách sử dụng AWS Management Console, API hoặc AWS Command Line Interface (AWS CLI). Nếu đang sử dụng cơ sở hạ tầng dưới dạng mã – infrastructure as code (IaC), bạn có thể tự động hóa quá trình thiết lập bằng cách sử dụng AWS CloudFormation, AWS Cloud Development Kit (AWS CDK) hoặc Terraform scripts.

Sau khi tạo thành công môi trường Airflow phiên bản 2.8.1 trong Amazon MWAA, một số packages nhất định sẽ được cài đặt tự động trên scheduler và worker nodes. Để biết danh sách đầy đủ các packages đã cài đặt và phiên bản của chúng, hãy tham khảo Apache Airflow provider packages installed on Amazon MWAA environments. Bạn có thể cài đặt các packages bổ sung bằng tệp yêu cầu.

Nâng cấp từ các phiên bản cũ hơn của Airflow lên phiên bản 2.8.1

Bạn có thể tận dụng những khả năng mới nhất này bằng cách nâng cấp môi trường dựa trên Airflow phiên bản 2.x cũ lên phiên bản 2.8.1 bằng cách sử dụng các bản nâng cấp phiên bản in-place. Để tìm hiểu thêm về nâng cấp phiên bản tại chỗ, hãy tham khảo Upgrading the Apache Airflow version hoặc Introducing in-place version upgrades with Amazon MWAA.

You can take advantage of these latest capabilities by upgrading your older Airflow version 2.x-based environments to version 2.8.1 using in-place version upgrades. To learn more about in-place version upgrades, refer to Upgrading the Apache Airflow version or Introducing in-place version upgrades with Amazon MWAA.

Tổng kết

Trong bài đăng này, chúng tôi đã thảo luận về một số tính năng quan trọng được giới thiệu trong Airflow phiên bản 2.8, chẳng hạn như lưu trữ đối tượng, tab XCom mới được thêm vào chế độ xem lưới (grid view), ghi nhật ký ngữ cảnh tác vụ – task context logging, listener hooks cho bộ dữ liệu và cách bạn có thể bắt đầu sử dụng chúng. Chúng tôi cũng cung cấp một số mã mẫu để hiển thị quá trình triển khai trong Amazon MWAA. Để biết danh sách đầy đủ các thay đổi, hãy tham khảo Airflow’s release notes.

Để biết thêm thông tin chi tiết và ví dụ về mã trên Amazon MWAA, hãy truy cập Amazon MWAA User GuideAmazon MWAA examples GitHub repo.

Apache, Apache Airflow và Airflow là các nhãn hiệu hoặc nhãn hiệu đã đăng ký của Apache Software Foundation tại Hoa Kỳ và/hoặc các quốc gia khác.


Về tác giả

Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.
Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines.

Link blog tiếng Anh:
Introducing Amazon MWAA support for Apache Airflow version 2.8.1 | AWS Big Data Blog