Chuyển đổi sự kiện Apache Kafka từ Avro sang JSON bằng EventBridge Pipes

by Pascal Vogel | on 15 NOV 2023 | in Amazon EventBridge, Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS Glue, Serverless | Permalink |  Share

Bài đăng này được viết bởi Pascal Vogel, Kiến trúc sư giải pháp và Philipp Klose, Kiến trúc sư giải pháp toàn cầu.

Truyền phát sự kiện với Apache Kafka đã trở thành một yếu tố quan trọng của  kiến trúc hướng sự kiện và hướng dữ liệu (EDA), mở khóa các trường hợp sử dụng như phân tích thời gian thực về hành vi người dùng, phát hiện bất thường và gian lận và xử lý sự kiện Internet of Things. Các nhà sản xuất và người tiêu dùng ở luồng trong Kafka thường sử dụng các cơ quan đăng ký lược đồ để đảm bảo rằng tất cả các thành phần tuân theo các cấu trúc sự kiện đã thỏa thuận khi gửi tuần tự hóa (serializing) và xử lý khử tuần tự hóa (deserializing) các sự kiện để tránh lỗi và sự cố ứng dụng.

Một định dạng lược đồ phổ biến trong Kafka là Apache Avro, hỗ trợ cấu trúc dữ liệu phong phú ở định dạng nhị phân nhỏ gọn. Để tích hợp Kafka với các dịch vụ AWS và bên thứ ba khác dễ dàng hơn, AWS cung cấp Amazon EventBridge Pipes, một dịch vụ tích hợp point-to-point ở điểm-điểm phi máy chủ. Tuy nhiên, nhiều dịch vụ xuôi dòng mong đợi các sự kiện được mã hóa JSON, yêu cầu logic chuyển đổi và xác thực lược đồ tùy chỉnh và lặp đi lặp lại từ Avro sang JSON trong mỗi dịch vụ xuôi dòng.

Bài đăng trên blog này trình bày cách sử dụng, xác thực, chuyển đổi và gửi sự kiện Avro từ Kafka đến AWS và các dịch vụ của bên thứ ba một cách đáng tin cậy bằng cách sử dụng EventBridge Pipes, cho phép bạn giảm logic khử tuần tự tùy chỉnh trong các dịch vụ xuôi dòng. Bạn cũng có thể sử dụng bus sự kiện EventBridge làm đích trong Pipes để lọc và phân phối sự kiện từ Pipes đến nhiều đích, bao gồm cả phân phối chéo tài khoản và xuyên liên khu vực.

Blog này mô tả hai tình huống:

  1.  Sử dụng Amazon Managed Streaming cho Apache Kafka (Amazon MSK) AWS Glue Schema Registry.
  2. Sử dụng Confluent Cloud Confluent Schema Registry.

Xem các kho lưu trữ GitHub liên quan cho Glue Schema Registry hoặc Confluent Schema Registry để biết mã nguồn đầy đủ và hướng dẫn triển khai chi tiết.

Phát trực tuyến sự kiện Kafka và xác thực lược đồ trên AWS

Để xây dựng các ứng dụng phát trực tuyến sự kiện với Kafka trên AWS, bạn có thể sử dụng Amazon MSK, các dịch vụ như Confluent Cloud hoặc Kafka tự lưu trữ trên Amazon Elastic Compute Cloud (Amazon EC2)

Để tránh các vấn đề thường gặp trong phát trực tuyến sự kiện và kiến trúc theo sự kiện, chẳng hạn như dữ liệu không nhất quán và không tương thích, bạn nên xác định và chia sẻ sơ đồ sự kiện giữa nhà sản xuất sự kiện và người tiêu dùng. Trong Kafka, các cơ quan đăng ký lược đồ được sử dụng để quản lý, phát triển và thực thi các lược đồ cho các nhà sản xuất sự kiện và người tiêu dùng. AWS Glue Schema Registry cung cấp một vị trí trung tâm để khám phá, quản lý và phát triển sơ đồ. Trong trường hợp của Confluent Cloud, Confluent Schema Registry cũng đóng vai trò tương tự. Cả Glue Schema Registry và Confluent Schema Registry đều hỗ trợ các định dạng lược đồ phổ biến như Avro, Protobuf và JSON.

Để tích hợp Kafka với các dịch vụ AWS, dịch vụ của bên thứ ba và ứng dụng của riêng bạn, bạn có thể sử dụng EventBridge Pipes. EventBridge Pipes giúp bạn tạo tích hợp point-to-point điểm-điểm giữa các nguồn mục tiêu sự kiện với tính năng lọc, chuyển đổi và làm phong phú tùy chọn. EventBridge Pipes giảm số lượng mã tích hợp mà bạn phải viết và duy trì khi xây dựng EDA.

Nhiều dịch vụ AWS và bên thứ ba sử dụng tải trọng (sự kiện) được mã hóa JSON làm đầu vào, nghĩa là chúng không thể trực tiếp sử dụng tải trọng Avro hoặc Protobuf. Để thay thế logic chuyển đổi và xác thực Avro-to-JSON lặp đi lặp lại ở mỗi người tiêu dùng, bạn có thể sử dụng bước làm giàu EventBridge Pipes. Giải pháp này sử dụng  hàm AWS Lambda trong bước làm giàu để khử tuần tự hóa và xác thực các sự kiện Kafka bằng sổ đăng ký lược đồ, bao gồm xử lý lỗi với hàng đợi chữ chết và chuyển đổi sự kiện thành JSON trước khi chuyển chúng sang các dịch vụ xuôi dòng.

Tổng quan về giải pháp

Giải pháp được trình bày trong bài đăng trên blog này bao gồm các yếu tố chính sau:

  1. Nguồn của pipe đường ống là một cụm Kafka được triển khai bằng MSK hoặc Confluent Cloud. EventBridge Pipes đọc các sự kiện từ luồng Kafka theo lô và gửi chúng đến hàm làm giàu (xem tại đây để  biết một sự kiện ví dụ).
  2. Bước bổ sung Bước làm giàu (hàm Lambda) khử tuần tự và xác thực các sự kiện dựa trên sổ đăng ký lược đồ đã định cấu hình (Glue hoặc Confluent), chuyển đổi các sự kiện từ Avro sang JSON với xử lý lỗi tích hợp và trả chúng về pipe.
  3. Mục tiêu của giải pháp ví dụ này là bus sự kiện tùy chỉnh EventBridge  được gọi bởi EventBridge Pipes với các sự kiện được mã hóa JSON do hàm Lambda bổ sung  làm giàu trả về. EventBridge Pipes hỗ trợ nhiều đích khác, bao gồm Lambda, AWS Step Functions, Amazon API Gateway, đích API, v.v., cho phép bạn xây dựng EDA mà không cần viết mã tích hợp.
  4. Trong giải pháp mẫu này, bus sự kiện gửi tất cả sự kiện đến Amazon CloudWatch Logs thông qua quy tắc EventBridge. Bạn có thể mở rộng ví dụ để gọi các mục tiêu EventBridge bổ sung.

Theo tùy chọn, bạn có thể thêm OpenAPI 3 hoặc JSONSchema 4 cho các sự kiện của mình trong sổ đăng ký lược đồ EventBridge bằng cách tạo thủ công từ lược đồ Avro hoặc sử dụng khám phá sơ đồ EventBridge. Điều này cho phép bạn tải xuống các liên kết mã cho các sự kiện được chuyển đổi JSON cho các ngôn ngữ lập trình khác nhau, chẳng hạn như JavaScript, Python và Java, để sử dụng chúng một cách chính xác trong các đích EventBridge của bạn.

Phần còn lại của bài đăng trên blog này mô tả giải pháp này cho các cơ quan đăng ký lược đồ Glue và Confluent với các ví dụ mã.

EventBridge Pipes với Glue Schema Registry

Phần này mô tả cách triển khai xác thực và chuyển đổi lược đồ sự kiện từ Avro sang JSON bằng cách sử dụng EventBridge Pipes và Glue Schema Registry. Bạn có thể tìm thấy mã nguồn và hướng dẫn triển khai chi tiết trên GitHub.

Điều kiện tiên quyết

Bạn cần một  cụm phi máy chủ Amazon MSK đang chạy và sổ đăng ký Glue Schema đã được cấu hình. Ví dụ này bao gồm lược đồ Avro và Sổ đăng ký lược đồ keo. Xem bài đăng blog AWS sau đây để biết giới thiệu về xác thực lược đồ với Glue Schema Registry: Xác thực, phát triển và kiểm soát sơ đồ trong Amazon MSK và Amazon Kinesis Data Streams bằng AWS Glue Schema Registry.

Cấu hình EventBridge Pipes

Sử dụng  mẫu AWS Cloud Development Kit (AWS CDK) được cung cấp trong kho GitHub để triển khai:

  1. Một kênh dẫn EventBridge kết nối với chủ đề Amazon MSK Serverless Kafka hiện có của bạn làm nguồn thông qua AWS Identity and Access Management (IAM) .
  2. EventBridge Pipes đọc các sự kiện từ chủ đề Kafka của bạn bằng cách sử dụng loại nguồn Amazon MSK.
  3. Một hàm Lambda làm giàu trong Java để thực hiện deserialization, xác thực và chuyển đổi sự kiện từ Avro sang JSON.
  4. Hàng đợi Amazon Simple Queue Service (Amazon SQS) để tổ chức các sự kiện mà quá trình khử tuần tự hóa không thành công.
  5. Sự kiện tùy chỉnh EventBridge làm mục tiêu chính. Quy tắc EventBridge gửi tất cả các sự kiện đến vào nhóm nhật ký CloudWatch Logs.
  6. Đối với các nguồn dựa trên MSK, EventBridge hỗ trợ các tham số cấu hình, chẳng hạn như cửa sổ lô, kích thước lô và vị trí bắt đầu mà bạn có thể đặt bằng cách sử dụng các tham số của  lớp CfnPipe trong ngăn xếp CDK mẫu.

Ví dụ về pipe EventBridge tiêu thụ các sự kiện từ Kafka theo lô 10 vì nó đang nhắm mục tiêu bus sự kiện EventBridge, có kích thước lô tối đa là 10. Xem phân lô và tính đồng thời trong Hướng dẫn sử dụng EventBridge Pipes để chọn cấu hình tối ưu cho các đích khác.

EventBridge Pipes với Confluent Schema Registry

Phần này mô tả cách triển khai xác thực và chuyển đổi lược đồ sự kiện từ Avro sang JSON bằng cách sử dụng EventBridge Pipes và Confluent Schema Registry. Bạn có thể tìm thấy mã nguồn và hướng dẫn triển khai chi tiết trên GitHub.

Điều kiện tiên quyết

Để thiết lập giải pháp này, bạn cần một luồng Kafka chạy trên Confluent Cloud cũng như Confluent Schema Registry được thiết lập. Xem hướng dẫn Schema Registry tương ứng cho Confluent Cloud để thiết lập sổ đăng ký lược đồ cho luồng Confluent Kafka của bạn.

Để kết nối với cụm Confluent Cloud Kafka, bạn cần có khóa API cho Confluent Cloud và Confluent Schema Registry. AWS Secrets Manager được sử dụng để lưu trữ bảo mật Confluent của bạn.

Cấu hình EventBridge Pipes

Sử dụng mẫu AWS CDK được cung cấp trong kho lưu trữ GitHub để triển khai:

  1. Kênh dẫn EventBridge kết nối với chủ đề Confluent Kafka hiện có của bạn dưới dạng làm nguồn thông qua bảo bí mật API được lưu trữ trong Trình quản lý bảo bí mật.
  2. EventBridge Pipes đọc các sự kiện từ chủ đề Confluent Kafka của bạn bằng cách sử dụng loại nguồn luồng Apache Kafka tự quản lý, bao gồm tất cả các cụm Kafka không phải MSK.
  3.  Một hàm Lambda làm giàu trong Python để thực hiện khử tuần tự hóa sự kiện, xác thực và chuyển đổi từ Avro sang JSON.
  4. Một hàng đợi thư chết của SQS để tổ chức các sự kiện mà việc khử tuần tự hóa không thành công.
  5. Sự kiện tùy chỉnh EventBridge làm mục tiêu chính. Quy tắc EventBridge ghi tất cả các sự kiện đến vào nhóm nhật ký CloudWatch Logs.

Đối với các nguồn Kafka tự quản lý, EventBridge hỗ trợ các tham số cấu hình, chẳng hạn như cửa sổ lô, kích thước lô và vị trí bắt đầu mà bạn có thể đặt bằng cách sử dụng các tham số của lớp CfnPipe trong ngăn xếp CDK ví dụ.

Ví dụ về pipe EventBridge tiêu thụ các sự kiện từ Kafka theo lô 10 vì nó đang nhắm mục tiêu bus sự kiện EventBridge, có kích thước lô tối đa là 10. Xem phân lô và tính đồng thời trong Hướng dẫn sử dụng EventBridge Pipes để chọn cấu hình tối ưu cho các đích khác.

Bổ sung hàm Lambda

Cả hai giải pháp được mô tả trước đây đều bao gồm bổ sung hàm Lambda để xác thực lược đồ và chuyển đổi từ Avro sang JSON.

Hàm Java Lambda tích hợp với Glue Schema Registry bằng AWS Glue Schema Registry Library. Hàm Python Lambda tích hợp với Confluent Schema Registry bằng thư viện confluent-kafka và sử dụng Powertools for AWS Lambda (Python) để triển khai các biện pháp thực hành tốt nhất về Serverless như ghi nhật ký và truy tìm.

Các hàm Lambda bổ sung làm giàu thực hiện các tác vụ sau:

  1. Trong các sự kiện được thăm dò từ luồng Kafka bằng pipe EventBridge, khóa và giá trị của sự kiện được  mã hóa base64. Do đó, đối với mỗi sự kiện trong lô được truyền đến hàm, khóa và giá trị được giải mã.
  2. Khóa sự kiện được giả định là được nhà sản xuất tuần tự hóa dưới dạng kiểu chuỗi.
  3. Giá trị sự kiện được deserialized bằng cách sử dụng Glue Schema registry  (Java) hoặc confluent-kafka AvroDeserializer (Python).
  4. Sau đó, hàm trả về các sự kiện JSON được chuyển đổi thành công vào pipe EventBridge, sau đó gọi đích cho từng sự kiện.
  5. Các sự kiện mà Avro deserialization thất bại được gửi đến hàng đợi SQS dead letter.

Kết thúc

Bài đăng trên blog này trình bày cách triển khai mức tiêu thụ sự kiện, xác thực lược đồ Avro và chuyển đổi sang JSON bằng Amazon EventBridge Pipes, Glue Schema Registry và Confluent Schema Registry.

Mã nguồn cho ví dụ được trình bày có sẵn trong kho lưu trữ GitHub mẫu AWS cho Glue Schema Registry Confluent Schema Registry. Để biết thêm mẫu, hãy truy cập Bộ sưu tập mẫu phi máy chủ.

Để biết thêm tài nguyên học tập serverless, hãy truy cập Serverless Land.

Tham khảo

Converting Apache Kafka events from Avro to JSON using EventBridge Pipes | AWS Compute Blog (amazon.com)

Leave a comment