Cải thiện khả năng mở rộng quy mô khi xử lý Apache Kafka bằng AWS Lambda

Bởi Julian Wood

AWS Lambda đang cải thiện hành vi thay đổi quy mô tự động khi xử lý dữ liệu từ các nguồn sự kiện của  Apache Kafka. Lambda đang tăng số lượng người dùng mặc định ban đầu, cải thiện tốc độ mở rộng quy mô của người dùng và giúp đảm bảo rằng người dùng không giảm quy mô quá nhanh. Bạn không phải thực hiện thêm hành động nào và không phải trả thêm chi phí.

Chạy Kafka trên AWS

Apache Kafka là một nền tảng nguồn mở phổ biến để xây dựng các ứng dụng và data pipelines theo thời gian thực. Bạn có thể triển khai và quản lý giải pháp Kafka của riêng mình ở on-premises hoặc trên Amazon EC2.

Amazon Managed Streaming for Apache Kafka (MSK) là một dịch vụ được quản lý hoàn toàn giúp xây dựng và chạy các ứng dụng sử dụng Kafka để xử lý dữ liệu truyền phát dễ dàng hơn. MSK Serverless là loại cluster dành cho Amazon MSK, cho phép bạn chạy Kafka mà không cần phải quản lý và thay đổi quy mô hiệu suât của cluster. Nó tự động cung cấp và chia tỷ lệ dung lượng trong khi quản lý các partitions, do đó bạn có thể truyền dữ liệu mà không cần suy nghĩ về việc điều chỉnh kích thước hoặc chia tỷ lệ các cluster phù hợp. MSK Serverless cung cấp mô hình định giá dựa trên thông lượng, do đó bạn chỉ trả tiền cho những gì bạn sử dụng. Để biết thêm thông tin, hãy xem Sử dụng Kafka để xây dựng ứng dụng phát trực tuyến của bạn.

Sử dụng Lambda để sử dụng các records từ Kafka

Việc xử lý dữ liệu trực tuyến có thể phức tạp trong các kiến trúc dựa trên máy chủ, đặc biệt nếu bạn phải xử lý trong thời gian thực. Nhiều tổ chức dành thời gian và chi phí đáng kể để quản lý và mở rộng nền tảng phát trực tuyến của họ. Để xử lý nhanh, họ phải cung cấp công suất tối đa, điều này làm tăng thêm độ phức tạp.

Kiến trúc Lambda và serverless loại bỏ đáng kể những vấn đề khi xử lý luồng Kafka. Bạn không cần phải quản lý cơ sở hạ tầng, có thể giảm chi phí vận hành, giảm chi phí và mở rộng quy mô theo yêu cầu. Điều này giúp bạn tập trung hơn vào việc xây dựng các ứng dụng phát trực tuyến. Bạn có thể viết hàm Lambda bằng một số ngôn ngữ lập trình để mang lại sự linh hoạt khi xử lý dữ liệu truyền trực tuyến.

Lambda event source mapping

Lambda có thể tích hợp nguyên bản với môi trường Kafka của bạn với tư cách là người dùng để xử lý dữ liệu ngay khi nó được tạo.

Để sử dụng dữ liệu trực tuyến từ Kafka, bạn định cấu hình event source mapping (ESM) trên các hàm Lambda của mình. Đây là tài nguyên do Lambda quản lý, tách biệt với chức năng của bạn. Nó liên tục thăm dò các bản ghi từ các topics trong cụm Kafka. ESM tùy chọn lọc các bản ghi và gộp chúng thành một tải trọng. Sau đó, nó gọi Lambda gọi API để phân phối tải trọng đến hàm Lambda của bạn một cách đồng bộ để xử lý.

Khi Lambda quản lý pollers, bạn không cần phải quản lý một nhóm người dùng trên nhiều nhóm. Mỗi nhóm có thể tạo và định cấu hình ESM của riêng mình nhờ Lambda xử lý polling.

AWS Lambda event source mapping

Mở rộng quy mô và thông lượng

Kafka sử dụng các partitions để tăng thông lượng và phân bổ bản ghi tới tất cả các broker trong một cluster.

Lambda event source mapping resource bao gồm các pollers và processors. Pollers có người dùng đọc bản ghi từ partitions Kafka. Poller assigners gửi chúng đến bộ xử lý để xử lý các bản ghi và gọi hàm của bạn.

Khi bạn tạo Kafka event source mapping, Lambda sẽ phân bổ người dùng xử lý tất cả các partitions trong Kafka topic. Trước đây, Lambda đã phân bổ tối thiểu một bộ xử lý cho người dùng.

Lambda previous initial scaling

Với những cải tiến về quy mô, Lambda phân bổ nhiều bộ xử lý để cải thiện khả năng xử lý. Điều này làm giảm khả năng một lệnh gọi làm chậm toàn bộ luồng xử lý.

Lambda updated initial scaling

Mỗi người dùng gửi bản ghi đến nhiều bộ xử lý chạy song song để xử lý khối lượng công việc tăng lên. Các bản ghi trong mỗi partitions chỉ được gán cho một bộ xử lý duy nhất để duy trì trật tự.

Lambda tự động tăng hoặc giảm số lượng người dùng và bộ xử lý dựa trên khối lượng công việc. Lambda lấy mẫu offset lag của người dùng từ tất cả các partitions trong topic mỗi phút. Nếu độ trễ ngày càng tăng, điều này có nghĩa là Lambda không thể theo kịp việc xử lý các bản ghi từ partitions.

Thuật toán tính tỷ lệ offset lag hiện tại và cả tỷ lệ tin nhắn mới được thêm vào topic. Lambda có thể tiếp cận số lượng người dùng tối đa trong vòng ba phút để giảm offset lag nhanh nhất có thể. Lambda cũng đang giảm scale down để đảm bảo các bản ghi được xử lý nhanh hơn và giảm độ trễ, đặc biệt đối với khối lượng công việc đột biến.

Tổng số bộ xử lý cho tất cả poller chỉ có thể mở rộng tối đa tổng số partitions trong topic.

Sau khi gọi thành công, poller cam kết bù đắp cho các brokers tương ứng.

Lambda further scaling

Bạn có thể theo dõi thông lượng của Kafka topic bằng cách sử dụng số liệu người dùng Consumer_lag và Consumer_offset.

Để kiểm tra xem có bao nhiêu lệnh gọi hàm xảy ra song song, bạn cũng có thể theo dõi số liệu concurrency  cho hàm của mình. Concurrency bằng tổng số processors trên tất cả các pollers, tùy thuộc vào hoạt động của processor. Ví dụ: nếu ba pollers có năm processors đang chạy cho một ESM nhất định, thì khả năng concurrency của hàm sẽ xấp xỉ 15 (5 + 5 + 5).

Khả năng mở rộng quy mô được cải thiện trong thực tế

Có một số Serverless Patterns mà bạn có thể sử dụng để xử lý luồng Kafka bằng Lambda. Để thiết lập Amazon MSK Serverless, hãy làm theo hướng dẫn trong GitHub repo:

  1. Tạo một ví dụ về Amazon MSK Serverless topic với 1000 partitions.

./kafka-topics.sh --create --bootstrap-server "{bootstrap-server}" --command-config client.properties --replication-factor 3 --partitions 1000 --topic msk-1000p
  1. Thêm bản ghi vào topic bằng cách sử dụng UUID làm khóa để phân phối đồng đều các bản ghi trên các partitions. Ví dụ này thêm 13 triệu bản ghi.
for x in {1..13000000}; do echo $(uuidgen -r),message_$x; done | ./kafka-console-producer.sh --broker-list "{bootstrap-server}" --topic msk-1000p --producer.config client.properties --property parse.key=true --property key.separator=, --producer-property acks=all
  1. Tạo một hàm Python dựa theo mẫu này để ghi lại các bản ghi đã xử lý.
  2. Sửa đổi function code để chèn độ trễ 0,1 giây nhằm mô phỏng quá trình xử lý bản ghi.
import json

import base64

import time

def lambda_handler(event, context):

    # Define a variable to keep track of the number of the message in the batch of messages

    i=1

    # Looping through the map for each key (combination of topic and partition)

    for record in event['records']:

        for messages in event['records'][record]:

            print("********************")

            print("Record number: " + str(i))

            print("Topic: " + str(messages['topic']))

            print("Partition: " + str(messages['partition']))

            print("Offset: " + str(messages['offset']))

            print("Timestamp: " + str(messages['timestamp']))

            print("TimestampType: " + str(messages['timestampType']))

            if None is not messages.get('key'):

                b64decodedKey=base64.b64decode(messages['key'])

                decodedKey=b64decodedKey.decode('ascii')

            else:

                decodedKey="null"

            if None is not messages.get('value'):

                b64decodedValue=base64.b64decode(messages['value'])

                decodedValue=b64decodedValue.decode('ascii')

            else:

                decodedValue="null"

            print("Key = " + str(decodedKey))

            print("Value = " + str(decodedValue))

            i=i+1

            time.sleep(0.1)

    return {

        'statusCode': 200,

    }
  1. Cấu hình ESM để trỏ đến cluster và topic đã tạo trước đó.
  2. Sử dụng kích thước batch mặc định là 100. Đặt StartingPosition to TRIM_HORIZON để xử lý từ đầu luồng.
  3. Triển khai function, đồng thời bổ sung và cấu hình ESM.
  4. Xem số liệu Amazon CloudWatch ConcurrentExecutions và OffsetLag để xem quá trình xử lý.

Với những cải tiến về quy mô, khi ESM được cấu hình, ESM và function sẽ mở rộng quy mô để xử lý số lượng partitions.

Lambda automatic scaling improvement graph

Tăng thông lượng xử lý dữ liệu

Điều quan trọng là function của bạn có thể theo kịp tốc độ lưu lượng truy cập. Offset lag ngày càng tăng có nghĩa là quá trình xử lý hàm không thể theo kịp. Nếu Thời gian đã trôi qua kể từ khi bản ghi được thêm vào luồng cao so với thời gian lưu giữ của luồng, bạn có thể mất dữ liệu khỏi luồng khi các bản ghi hết hạn.

Giá trị này thường không vượt quá 50% thời gian lưu giữ của luồng. Khi giá trị đạt 100% thời gian lưu giữ luồng, dữ liệu sẽ bị mất. Một giải pháp tạm thời là tăng thời gian lưu của luồng. Điều này giúp bạn có thêm thời gian để giải quyết vấn đề trước khi mất dữ liệu.

Tránh xử lý các bản ghi không cần thiết bằng cách sử dụng tính năng lọc nội dung để kiểm soát những bản ghi Lambda gửi đến hàm của bạn. Điều này giúp giảm lưu lượng truy cập vào chức năng của bạn, đơn giản hóa mã và giảm chi phí tổng thể.

Có một số cách để cải thiện việc xử lý thông lượng:

  1. Tránh xử lý các bản ghi không cần thiết bằng cách sử dụng tính năng lọc nội dung để kiểm soát những bản ghi được Lambda gửi đến function của bạn. Điều này giúp giảm lưu lượng truy cập vào function của bạn, đơn giản hóa code và giảm chi phí.
  2. Lambda phân bổ processors trên tất cả các pollers dựa trên số lượng partitions để tối đa một concurrency Lambda function trên mỗi partitions. Bạn có thể tăng số lượng hàm Lambda xử lý bằng cách tăng số lượng partitions.
  3. Đối với các chức năng điện toán chuyên sâu, bạn có thể tăng bộ nhớ được phân bổ cho function của mình, điều này cũng làm tăng lượng CPU ảo khả dụng. Điều này có thể giúp giảm thời lượng xử lý của 1 function.
  4. Lambda polls Kafka với cấu hình kích thước batch của record. Bạn có thể tăng kích thước batch để xử lý nhiều bản ghi hơn trong một lần gọi. Điều này có thể cải thiện thời gian xử lý và giảm chi phí, đặc biệt nếu function của bạn có thời gian khởi tạo tăng lên. Kích thước batch lớn hơn sẽ làm tăng độ trễ để xử lý bản ghi đầu tiên trong batch, nhưng có khả năng làm giảm độ trễ để xử lý bản ghi cuối cùng trong batch. Có sự cân bằng giữa chi phí và độ trễ khi tối ưu hóa dung lượng của partitions  và quyết định tùy thuộc vào nhu cầu khối lượng công việc của bạn.
  5. Đảm bảo rằng producers của bạn phân phối đồng đều các bản ghi trên các partitions  bằng cách sử dụng partition key  hiệu quả. Khối lượng công việc sẽ không cân bằng khi một key duy nhất lấn át các key khác, tạo ra hot partition, ảnh hưởng đến thông lượng.

Xem thêm  Increasing data processing throughput để biết thêm hướng dẫn.

Kết luận

Hiện nay, AWS Lambda đang cải thiện hành vi tự động thay đổi quy mô khi xử lý dữ liệu từ các nguồn sự kiện Apache Kafka. Lambda đang tăng số lượng người dùng mặc định ban đầu, cải thiện tốc độ mở rộng quy mô và đảm bảo họ không giảm quy mô quá nhanh. Bạn không phải thực hiện thêm hành động nào và không phải trả thêm chi phí.

Bạn có thể tìm hiểu các cải tiến về quy mô với khối lượng công việc hiện có của mình hoặc triển khai cụm Amazon MSK và thử một trong các mẫu để đo thời gian xử lý.

Tìm hiểu cách sử dụng Lambda để xử lý luồng Kafka, hãy xem hướng dẫn tại đây.

Để biết thêm về serverless, hãy truy cập Serverless Land.