Tạo một serverless Apache Kafka publisher sử dụng AWS Lambda

by James Beswick | on 15 JUL 2022 | in Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS Lambda, Serverless | Permalink |  Share

Bài viết này được viết bởi Philipp Klose, Global Solution Architect, and Daniel Wessendorf, Global Solution Architect.

Dữ liệu phát trực tuyến và kiến ​​trúc event-driven đang trở nên phổ biến hơn đối với nhiều hệ thống hiện đại. Phạm vi các trường hợp sử dụng bao gồm theo dõi web và các log khác, IoT công nghiệp, hoạt động của người chơi trong trò chơi và việc tiếp nhận dữ liệu cho kiến trúc phân tích hiện đại.

Một trong những công nghệ phổ biến nhất trong lĩnh vực này là Apache Kafka. Đây là một nền tảng phân phối event streaming mã nguồn mở được sử dụng bởi nhiều khách hàng cho các đường ống dữ liệu hiệu suất cao, phân tích luồng, tích hợp dữ liệu và các ứng dụng quan trọng cho nhiệm vụ.

Kafka được xây dựng dựa trên một mô hình đơn giản nhưng mạnh mẽ.  Kafka Cluster chính là một trình giữ thông điệp có khả năng sẵn có cao, nhận các thông điệp từ nhiều producers khác nhau. Các thông điệp nhận được được lưu trữ trong các chủ đề, đây là sự trừu tượng lưu trữ chính.

Các thư viện Producer và consumer cho Kafka có sẵn trong nhiều ngôn ngữ lập trình và công nghệ khác nhau. Bài đăng trên blog này tập trung vào việc sử dụng các công nghệ không có máy chủ và đám mây cho phía nhà sản xuất.

Tổng quan

Ví dụ này hướng dẫn bạn cách xây dựng một ứng dụng producer luồng thời gian thực không có máy chủ bằng cách sử dụng Amazon API Gateway và  AWS Lambda.

Để kiểm tra, blog này bao gồm một ứng dụng mẫu AWS Cloud Development Kit (CDK). Điều này tạo ra một môi trường minh họa, bao gồm một Amazon Managed Streaming for Apache Kafka (MSK) cluster và một máy chủ bastion để quan sát các thông điệp được sản xuất trên cụm.

For testing, this blog includes a sample AWS Cloud Development Kit (CDK) application. This creates a demo environment, including an Amazon Managed Streaming for Apache Kafka (MSK) cluster and a bastion host for observing the produced messages on the cluster.

Bản vẽ dưới đây mô tả kiến trúc của một ứng dụng đẩy các yêu cầu API vào một chủ đề Kafka trong thời gian thực, mà bạn xây dựng trong bài đăng trên blog này:

  1. Một ứng dụng bên ngoài gọi một endpoint của Amazon API Gateway
  2. Amazon API Gateway chuyển tiếp yêu cầu đến một hàm Lambda
  3. Hàm Lambda của AWS hoạt động như một nhà sản xuất Kafka và đẩy thông điệp vào một chủ đề Kafka
  4. Một “console consumer” Kafka trên máy chủ bastion sau đó đọc thông điệp

Bản demo cho thấy cách sử dụng Lambda Powertools for Java để tối ưu hóa việc ghi nhật ký và theo dõi, và một bộ xác thực IAM để đơn giản hóa quá trình xác thực cluster. Các phần sau sẽ hướng dẫn bạn qua các bước triển khai, kiểm tra và quan sát ứng dụng mẫu.

Chuẩn bị

Ví dụ này yêu cầu chuẩn bị như sau:

  • Một tài khoản AWS. Để đăng ký:
    •     Tạo một tài khoản. Để biết hướng dẫn, xem Sign Up For AWS.
    •     Tạo một người dùng IAM trên AWS. Để biết hướng dẫn, xem “Tạo người dùng IAM”.

The example has the following prerequisites:

Hướng dẫn ví dụ chi tiết  

  1. Clone dự án từ kho lưu trữ GitHub. Chuyển đến thư mục con serverless-kafka-iac.
git clone https://github.com/aws-samples/serverless-kafka-producer
cd serverless-kafka-iac
  1. Cấu hình các biến môi trường
export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity –query ‘Account’ –output text)
export CDK_DEFAULT_REGION=$(aws configure get region)
  1. Chuẩn bị môi trường Python ảo:
python3 -m venv .venv
source .venv/bin/activate
pip3 install -r requirements.txt
  1. Khởi tạo tài khoản của bạn để sử dụng CDK:
cdk bootstrap aws://$CDK_DEFAULT_ACCOUNT/$CDK_DEFAULT_REGION
  1. Chạy lệnh ‘cdk synth’ để xây dựng mã và kiểm tra các yêu cầu:
cdk synth
  1. Chạy lệnh ‘cdk deploy’ để triển khai mã vào tài khoản AWS của bạn:
cdk deploy –all

Kiểm thử ví dụ

Để kiểm thử ví dụ, đăng nhập vào máy chủ bastion và bắt đầu một console tiêu thụ để quan sát các thông điệp được thêm vào chủ đề. Bạn tạo ra các thông điệp cho các chủ đề Kafka bằng cách gửi các cuộc gọi qua API Gateway từ máy phát triển của bạn hoặc môi trường AWS Cloud9.

  1. Sử dụng AWS System Manager để đăng nhập vào máy chủ bastion. Sử dụng KafkaDemoBackendStack.bastionhostbastion Output-Parameter để kết nối hoặc thông qua system manager.
aws ssm start-session –target <Bastion Host Instance Id>
sudo su ec2-user
cd /home/ec2-user/kafka_2.13-2.6.3/bin/
  1. Tạo một chủ đề có tên là “messages” trên cụm MSK:
./kafka-topics.sh –bootstrap-server $ZKcommand-config client.properties –create –replication-factor 3 –partitions 3 –topic messages
  1. Mở một console tiêu thụ Kafka trên máy chủ bastion để quan sát các thông điệp đến.
./kafka-console-consumer.sh –bootstrap-server $ZK –topic messages –consumer.config client.properties
  1. Mở một cửa sổ terminal khác trên máy phát triển của bạn để tạo các yêu cầu kiểm thử bằng cách sử dụng tham số đầu ra “ServerlessKafkaProducerStack.kafkaproxyapiEndpoint” của ngăn xếp CDK. Thêm “/event” vào URL cuối cùng. Sử dụng lệnh curl để gửi yêu cầu API:
curl -X POST -d “Hello World” <ServerlessKafkaProducerStack.messagesapiendpointEndpoint>
  1. Đối với việc kiểm thử tải ứng dụng, điều quan trọng là điều chỉnh các tham số. Bạn có thể sử dụng một công cụ như Artillery để mô phỏng các tải công việc. Bạn có thể tìm thấy một kịch bản mẫu của Artillery trong thư mục/load-testing từ bước 1.
  2. Quan sát các yêu cầu đến trong cửa sổ terminal của máy chủ bastion.

Tất cả các thành phần trong ví dụ này tích hợp với AWS X-Ray. Với AWS X-Ray, bạn có thể theo dõi toàn bộ ứng dụng, điều này hữu ích để xác định các chỗ trì trệ khi kiểm thử tải. Bạn cũng có thể theo dõi việc thực thi phương thức ở cấp độ phương thức Java.

Lambda Powertools for java cho Java cho phép bạn tăng tốc quá trình này bằng cách thêm chú thích @Trace để xem các dấu vết ở cấp độ phương thức trong X-Ray.

Để theo dõi một yêu cầu từ đầu đến cuối:

1. Điều hướng đến bảng điều khiển CloudWatch.

2. Mở Service map.

3. Chọn một thành phần để điều tra (ví dụ: Lambda function mà bạn triển khai nhà sản xuất Kafka). Chọn View traces.

4. Chọn một lời gọi phương thức Lambda duy nhất và điều tra sâu hơn ở cấp độ phương thức Java.

Dọn dẹp

Trong thư mục con “serverless-kafka-iac”, xóa cơ sở hạ tầng kiểm thử:

cdk destroy -all

Thực hiện một nhà sản xuất Kafka trong Lambda

Kafka hỗ trợ Java theo cách tự nhiên. Để duy trì tính mở, tính cloud native và không có phụ thuộc của bên thứ ba, nhà sản xuất được viết bằng ngôn ngữ này. Hiện tại, bộ xác thực IAM chỉ có sẵn cho Java. Trong ví dụ này, trình xử lý Lambda nhận một thông điệp từ nguồn Amazon API Gateway và đẩy thông điệp này vào một chủ đề MSK được gọi là “messages”.

Thường thì, nhà sản xuất Kafka là các quá trình sống lâu dài và việc đẩy một thông điệp vào một chủ đề Kafka là một quy trình không đồng bộ. Do Lambda là tạm thời, bạn phải buộc việc làm một lần đầy đủ của một thông điệp đã được gửi cho đến khi hàm Lambda kết thúc, bằng cách gọi producer.flush().

@Override
    @Tracing
    @Logging(logEvent = true)
    public APIGatewayProxyResponseEvent
    handleRequest(APIGatewayProxyRequestEvent input, Context context) {
        APIGatewayProxyResponseEvent response = createEmptyResponse();
        try {

            String message = getMessageBody(input);

            KafkaProducer<String, String> producer = createProducer();

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, context.getAwsRequestId(), message);

            Future<RecordMetadata> send = producer.send(record);
            producer.flush();

            RecordMetadata metadata = send.get();
            log.info(String.format(“Send message was send to partition %s”, metadata.partition()));

            log.info(String.format(“Message was send to partition %s”, metadata.partition()));

            return response.withStatusCode(200).withBody(“Message successfully pushed to kafka”);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return response.withBody(e.getMessage()).withStatusCode(500);
        }
    }

    @Tracing
    private KafkaProducer<String, String> createProducer() {
        if (producer == null) {
            log.info(“Connecting to kafka cluster”);
            producer = new KafkaProducer<String, String>(kafkaProducerProperties.getProducerProperties());
        }
        return producer;
    }

Kết nối vào Amazon MSK bằng cách sử dụng Xác thực IAM

Ví dụ này sử dụng xác thực IAM để kết nối vào các cụm Kafka tương ứng. Xem tài liệu tại đây. mô tả cách cấu hình nhà sản xuất để kết nối.

Vì bạn cấu hình cụm thông qua IAM, hãy cấp quyền “Connect” và “WriteData” cho nhà sản xuất, để nó có thể đẩy các thông điệp vào Kafka.

{
    “Version”: “2012-10-17”,
    “Statement”: [
        {           
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”
            ],
            “Resource”: “arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid “
        }
    ]
}


{
    “Version”: “2012-10-17”,
    “Statement”: [
        {           
            “Effect”: “Allow”,
            “Action”: [
                “kafka-cluster:Connect”,
                “kafka-cluster: DescribeTopic”,
            ],
            “Resource”: “arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name”
        }
    ]
}

Đây là đoạn trích của chính sách IAM cho Kafka, mà phải được áp dụng cho nhà sản xuất Kafka.

Khi sử dụng xác thực IAM, hãy lưu ý các giới hạn hiện tại của xác thực IAM Kafka, ảnh hưởng đến số kết nối đồng thời và các yêu cầu IAM cho một producer. Đọc https://docs.aws.amazon.com/msk/latest/developerguide/limits.html và tuân theo khuyến nghị cho việc tạm dừng xác thực trong producer client.

Map<String, String> configuration = Map.of(
                “key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”,
                “bootstrap.servers”, getBootstrapServer(),
                “security.protocol”, “SASL_SSL”,
                “sasl.mechanism”, “AWS_MSK_IAM”,
                “sasl.jaas.config”, “software.amazon.msk.auth.iam.IAMLoginModule required;”,
                “sasl.client.callback.handler.class”, “software.amazon.msk.auth.iam.IAMClientCallbackHandler”,
                “connections.max.idle.ms”, “60”,
                “reconnect.backoff.ms”, “1000”
        );

Mở rộng về việc triển khai

Mỗi nút broker Kafka có thể xử lý tối đa 20 yêu cầu xác thực IAM mỗi giây. Thiết lập demo có ba broker, tức là 60 yêu cầu mỗi giây. Do đó, thiết lập broker giới hạn số hàm Lambda đồng thời là 60.

Để giảm yêu cầu xác thực IAM từ nhà sản xuất Kafka, đặt nó bên ngoài của trình xử lý. Đối với các cuộc gọi thường xuyên, có khả năng Lambda sử dụng lại trường hợp lớp đã được tạo trước đó và chỉ thực thi lại trình xử lý.

Đối với các tải công việc cao với một số lượng cao của các yêu cầu API Gateway đồng thời, điều này có thể dẫn đến việc thả các thông điệp. Trong khi đối với một số tải công việc, điều này có thể được chấp nhận, nhưng đối với những trường hợp khác, điều này có thể không phải là trường hợp.

Trong những trường hợp này, bạn có thể mở rộng kiến ​​trúc với một công nghệ đệm như Amazon SQS hoặc Amazon Kinesis Data Streams giữa API Gateway và Lambda.

Để giảm độ trễ, bạn có thể giảm thời gian khởi động lạnh cho Java bằng cách thay đổi cấp độ biên dịch phân cấp thành “1” như mô tả trong bài đăng trên  blog post. Tính sẵn sàng được cấp trước đảm bảo rằng các hàm Lambda đang chờ đợi trước khi yêu cầu đến.

Kết luận

Trong bài viết này, bạn sẽ tìm hiểu cách tạo một hàm Lambda tích hợp không có máy chủ giữa API Gateway và Apache Managed Streaming for Apache Kafka (MSK). Chúng tôi sẽ hướng dẫn cách triển khai một tích hợp như vậy bằng CDK.

Mô hình chung này phù hợp cho nhiều trường hợp sử dụng cần tích hợp giữa API Gateway và Apache Kafka. Nó có thể mang lại lợi ích về chi phí hơn so với các triển khai chứa trong các trường hợp sử dụng với luồng đầu vào thưa thớt, ít volume và công việc không dự đoán hoặc biến động.

Để tìm hiểu thêm về các tài nguyên học tập về serverless, hãy truy cập Serverless Land. Để biết thêm thông tin, đọc hướng dẫn: Sử dụng Lambda để xử lý luồng Apache Kafka.

Leave a comment