by James Beswick | on 15 JUL 2022 | in Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS Lambda, Serverless | Permalink | Share
Bài đăng này được viết bởi Philipp Klose, Kiến trúc sư giải pháp toàn cầu và Daniel Wessendorf, Kiến trúc sư giải pháp toàn cầu.
Phương tiện T truyền dữ liệu trực tiếp và kiến trúc hướng sự kiện (event-driven architectures) đang trở nên phổ biến hơn đối với nhiều hệ thống hiện đại. Phạm vi các trường hợp sử dụng bao gồm theo dõi web và các nhật ký khác, IoT công nghiệp, hoạt động của người chơi trong trò chơi và nhập dữ liệu cho kiến trúc phân tích hiện đại.
Một trong những công nghệ phổ biến nhất trong không gian này là Apache Kafka. Đây là một nền tảng phát trực tuyến sự kiện phân tán mã nguồn mở được nhiều khách hàng sử dụng cho các đường ống dữ liệu hiệu suất cao, phân tích phát trực tuyến, tích hợp dữ liệu và các ứng dụng quan trọng.
Kafka dựa trên một mô hình đơn giản nhưng mạnh mẽ. Nhấn mạnh Bản thân cụm Kafka là một nhà môi giới có tính khả dụng cao nhận tin nhắn từ các nhà sản xuất khác nhau. Các tin nhắn nhận được được lưu trữ trong các chủ đề, là sự trừu tượng lưu trữ chính.
Nhiều người tiêu dùng khác nhau có thể đăng ký một chủ đề Kafka và sử dụng tin nhắn. Trái ngược với các hệ thống xếp hàng cổ điển, người tiêu dùng không xóa thông điệp khỏi chủ đề mà lưu trữ vị trí đọc riêng lẻ về chủ đề. Điều này cho phép nhiều mô hình tiêu thụ khác nhau (ví dụ: nhóm phân xuất hoặc nhóm người tiêu dùng).
Thư viện “Producer “ và “consumer “ cho Kafka có sẵn bằng nhiều ngôn ngữ lập trình và công nghệ khác nhau. Bài đăng trên blog này tập trung vào việc sử dụng các công nghệ serverless và cloud-native cho phía nhà sản xuất.
Tổng quan
Ví dụ này hướng dẫn bạn cách xây dựng ứng dụng tạo luồng thời gian thực phi máy chủ bằng Amazon API Gateway và AWS Lambda.
Để thử nghiệm, blog này bao gồm một ứng dụng AWS Cloud Development Kit (CDK) mẫu . Điều này tạo ra một môi trường demo, bao gồm cụm Amazon Managed Streaming for Apache Kafka (MSK) và một máy chủ pháo đài để quan sát các tin nhắn được tạo trên cụm.
Sơ đồ sau đây cho thấy kiến trúc của một ứng dụng đẩy các yêu cầu API đến một chủ đề Kafka trong thời gian thực, mà bạn xây dựng trong bài đăng blog này:
- Ứng dụng bên ngoài gọi điểm cuối Amazon API Gateway
- Amazon API Gateway chuyển tiếp yêu cầu đến hàm Lambda
- Hàm AWS Lambda hoạt động như một nguồn tạo Kafka và đẩy thông điệp đến một chủ đề Kafka
- Một “người tiêu dùng console” Kafka trên máy chủ pháo đài sau đó đọc tin nhắn
Bản demo trình bày cách sử dụng Lambda Powertools for Java để hợp lý hóa việc ghi nhật ký và truy tìm, cũng như trình xác thực IAM để đơn giản hóa quy trình xác thực cụm. Các phần sau đây sẽ đưa bạn qua các bước để triển khai, kiểm tra và quan sát ứng dụng mẫu.
Điều kiện tiên quyết
Ví dụ có các điều kiện tiên quyết sau:
- Tài khoản AWS. Để đăng ký:
- Tạo một tài khoản. Để biết hướng dẫn, hãy xem Đăng ký AWS.
- Tạo người dùng AWS Identity and Access Management (IAM). Để biết hướng dẫn, hãy xem Tạo người dùng IAM.
- Phần mềm sau được cài đặt trên máy phát triển của bạn hoặc sử dụng môi trường AWS Cloud9, được cài đặt sẵn tất cả các yêu cầu:
- Bộ công cụ phát triển Java 11 trở lên (ví dụ: Amazon Corretto 11, OpenJDK 11)
- Python phiên bản 3.7 trở lên
- Apache Maven phiên bản 3.8.4 hoặc cao hơn
- Docker phiên bản 20.10.12 trở lên
- Postman hoặc công cụ tương tự để kiểm tra API của bạn
- Node.js 16.x trở lên
- AWS CLI 2.4.27 trở lên
- AWS CDK 2.28.1 trở lên
- Plugin CLI của Trình quản lý phiên
- Đảm bảo rằng bạn có thông tin xác thực AWS phù hợp để tương tác với các tài nguyên trong tài khoản AWS của mình
Hướng dẫn ví dụ
- Sao chép kho lưu trữ GitHub của dự án. Thay đổi thư mục thành thư mục con serverless-kafka-iac:
Bash
| git clone https://github.com/aws-samples/serverless-kafka-producer cd serverless-kafka-iac |
Bash
- Định cấu hình biến môi trường:
Bash
| export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity –query ‘Account’ –output text) export CDK_DEFAULT_REGION=$(aws configure get region) |
Bash
- Chuẩn bị môi trường Python ảo:
Bash
| python3 -m venv .venv source .venv/bin/activate pip3 install -r requirements.txt |
- Bootstrap tài khoản của bạn để sử dụng CDK:
Bash
| cdk bootstrap aws://$CDK_DEFAULT_ACCOUNT/$CDK_DEFAULT_REGION |
Bash
- Chạy ‘cdk synth’ để xây dựng mã và kiểm tra các yêu cầu:
Bash
| cdk synth |
Bash
- Chạy ‘cdk deploy’ để triển khai mã cho tài khoản AWS của bạn:
Bash
| cdk deploy –all |
Bash
Thử nghiệm ví dụ
Để kiểm tra ví dụ, hãy đăng nhập vào máy chủ pháo đài và khởi động bảng điều khiển dành cho người tiêu dùng để quan sát các thông báo được thêm vào chủ đề. Bạn tạo tin nhắn cho các chủ đề Kafka bằng cách gửi lệnh gọi qua API Gateway từ máy phát triển hoặc môi trường AWS Cloud9.
- Sử dụng AWS System Manager để đăng nhập vào máy chủ pháo đài. Sử dụng KafkaDemoBackendStack.bastionhostbastion Output-Parameter để kết nối hoặc thông qua bảng điều khiển quản lý hệ thống.
Bash
| aws ssm start-session –target <Bastion Host Instance Id> sudo su ec2-user cd /home/ec2-user/kafka_2.13-2.6.3/bin/ |
Bash
- Tạo chủ đề có tên message trên cụm MSK:
Bash
| ./kafka-topics.sh –bootstrap-server $ZK –command-config client.properties –create –replication-factor 3 –partitions 3 –topic messages |
Bash
- Mở bảng điều khiển “consumer” Kafka trên “bastion host” để quan sát các tin nhắn đến:
Bash
| ./kafka-console-consumer.sh –bootstrap-server $ZK –topic messages –consumer.config client.properties |
Bash
- Mở một terminal khác trên máy phát triển của bạn để tạo request kiểm thử bằng cách sử dụng tham số output “ServerlessKafkaProducerStack.kafkaproxyapiEndpoint” của ngăn xếp CDK. Thêm “/event” cho URL cuối cùng. Sử dụng curl để gửi yêu cầu API:
Bash
| curl -X POST -d “Hello World” <ServerlessKafkaProducerStack.messagesapiendpointEndpoint> |
Bash
- Để kiểm tra tải ứng dụng, điều quan trọng là phải hiệu chỉnh các tham số. Bạn có thể sử dụng một công cụ như Artillery để mô phỏng khối lượng công việc. Bạn có thể tìm thấy một kịch bản pháo binh mẫu trong thư mục /load-testing từ bước 1.
- Quan sát yêu cầu đến trong thiết bị đầu cuối máy chủ pháo đài.
Tất cả các thành phần trong ví dụ này đều tích hợp với AWS X-Ray. Với AWS X-Ray, bạn có thể theo dõi toàn bộ ứng dụng, điều này rất hữu ích để xác định tắc nghẽn khi kiểm tra tải. Bạn cũng có thể theo dõi việc thực thi phương thức ở cấp độ phương thức Java.
Lambda Powertools cho java cho phép bạn đẩy nhanh quá trình này bằng cách thêm chú thích @Trace để xem dấu vết ở cấp độ phương thức trong X-Ray.
Để theo dõi yêu cầu từ đầu đến cuối:
- Điều hướng đến bảng điều khiển CloudWatch.
- Mở bản đồ Dịch vụ.
- Chọn một thành phần để điều tra (ví dụ: hàm Lambda nơi bạn đã triển khai nguồn tạo Kafka). Chọn View traces.
- Chọn một lệnh gọi phương thức Lambda duy nhất và điều tra thêm ở cấp độ phương thức Java.
Dọn dẹp
Trong thư mục con “serverless-kafka-iac”, xóa cơ sở hạ tầng thử nghiệm:
Bash
| cdk destroy -all |
Bash
Triển khai nhà sản xuất Kafka trong Lambda
Kafka nguyên bản hỗ trợ Java. Để luôn mở, gốc đám mây và không có sự phụ thuộc của bên thứ ba, nhà sản xuất được viết bằng ngôn ngữ đó. Hiện tại, trình xác thực IAM chỉ khả dụng cho Java. Trong ví dụ này, trình xử lý Lambda nhận tin nhắn từ nguồn Amazon API Gateway và đẩy thông báo này đến chủ đề MSK có tên là “tin nhắn”.
Thông thường, các nhà sản xuất Kafka sống lâu và đẩy một thông điệp đến một chủ đề Kafka là một quá trình không đồng bộ. Vì Lambda là tạm thời, bạn phải thực thi toàn bộ tin nhắn đã gửi cho đến khi hàm Lambda kết thúc, bằng cách gọi producer.flush().
| @Override @Tracing @Logging(logEvent = true) public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent input, Context context) { APIGatewayProxyResponseEvent response = createEmptyResponse(); try { String message = getMessageBody(input); KafkaProducer<String, String> producer = createProducer(); ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_NAME, context.getAwsRequestId(), message); Future<RecordMetadata> send = producer.send(record); producer.flush(); RecordMetadata metadata = send.get(); log.info(String.format(“Send message was send to partition %s”, metadata.partition())); log.info(String.format(“Message was send to partition %s”, metadata.partition())); return response.withStatusCode(200).withBody(“Message successfully pushed to kafka”); } catch (Exception e) { log.error(e.getMessage(), e); return response.withBody(e.getMessage()).withStatusCode(500); } } @Tracing private KafkaProducer<String, String> createProducer() { if (producer == null) { log.info(“Connecting to kafka cluster”); producer = new KafkaProducer<String, String> (kafkaProducerProperties.getProducerProperties()); } return producer; } |
Java
Kết nối với Amazon MSK bằng IAM Auth
Ví dụ này sử dụng xác thực IAM để kết nối với cụm Kafka tương ứng. Xem tài liệu tại đây, trong đó trình bày cách định cấu hình nhà sản xuất để kết nối.
Vì bạn cấu hình cụm thông qua IAM, hãy cấp quyền “Kết nối” và “WriteData” cho nhà sản xuất, để nó có thể đẩy tin nhắn đến Kafka.
JSON
| { “Version”: “2012-10-17”, “Statement”: [ { “Effect”: “Allow”, “Action”: [ “kafka-cluster:Connect” ], “Resource”: “arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid “ } ] } { “Version”: “2012-10-17”, “Statement”: [ { “Effect”: “Allow”, “Action”: [ “kafka-cluster:Connect”, “kafka-cluster: DescribeTopic”, ], “Resource”: “arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name” } ] } |
JSON
Điều này cho thấy đoạn trích Kafka của chính sách IAM, phải được áp dụng cho nhà sản xuất Kafka.
Khi sử dụng xác thực IAM, hãy lưu ý các giới hạn hiện tại của xác thực IAM Kafka, ảnh hưởng đến số lượng kết nối đồng thời và yêu cầu IAM cho một nguồn tạo. Đọc https://docs.aws.amazon.com/msk/latest/developerguide/limits.html và làm theo đề xuất để sao lưu xác thực trong ứng dụng khách nhà sản xuất:
Java
| Map<String, String> configuration = Map.of( “key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”, “value.serializer”,”org.apache.kafka.common.serialization.StringSerializer”, “bootstrap.servers”, getBootstrapServer(), “security.protocol”, “SASL_SSL”, “sasl.mechanism”, “AWS_MSK_IAM”, “sasl.jaas.config”, “software.amazon.msk.auth.iam.IAMLoginModule required;”, “sasl.client.callback.handler.class”, “software.amazon.msk.auth.iam.IAMClientCallbackHandler”, “connections.max.idle.ms”, “60”, “reconnect.backoff.ms”, “1000” ); |
Java
Xây dựng kế hoạch thực hiện
Mỗi nút broker Kafka có thể xử lý tối đa 20 yêu cầu xác thực IAM mỗi giây. Thiết lập demo có ba nhà môi giới, dẫn đến 60 yêu cầu mỗi giây. Do đó, thiết lập trình trung chuyển giới hạn số lượng hàm Lambda đồng thời là 60.
Để giảm yêu cầu xác thực IAM từ nhà sản xuất Kafka, hãy đặt nó bên ngoài trình xử lý. Đối với các lệnh gọi thường xuyên, có khả năng Lambda sử dụng lại phiên bản lớp đã tạo trước đó và chỉ thực thi lại trình xử lý.
Đối với khối lượng công việc tăng vọt với số lượng yêu cầu API Gateway đồng thời cao, điều này có thể dẫn đến thư bị rớt. Trong khi đối với một số khối lượng công việc, điều này có thể chấp nhận được, đối với những người khác thì điều này có thể không đúng.
Trong những trường hợp này, bạn có thể mở rộng kiến trúc bằng công nghệ đệm như Amazon SQS hoặc Amazon Kinesis Data Streams giữa API Gateway và Lambda.
Để giảm độ trễ, bạn có thể giảm thời gian khởi động nguội cho Java bằng cách thay đổi mức biên dịch theo tầng thành “1” như được mô tả trong bài đăng trên blog này. Tính đồng thời được cung cấp đảm bảo rằng các hàm Lambda bỏ phiếu đã sẵn sàng trước khi có yêu cầu.
Kết thúc
Trong bài đăng này, bạn sẽ tìm hiểu cách tạo hàm Lambda tích hợp serverless giữa API Gateway và Apache Managed Streaming cho Apache Kafka (MSK). Chúng tôi chỉ ra cách triển khai tích hợp như vậy với CDK.
Mô hình chung phù hợp với nhiều trường hợp sử dụng cần tích hợp giữa API Gateway và Apache Kafka. Nó có thể có lợi ích về chi phí so với việc triển khai trong bộ chứa trong các trường hợp sử dụng với các luồng đầu vào thưa thớt, khối lượng nhỏ và khối lượng công việc không thể đoán trước hoặc tăng đột biến.
Để biết thêm tài nguyên học tập serverless, hãy truy cập Serverless Land. Để tìm hiểu thêm, hãy đọc hướng dẫn: Sử dụng Lambda để xử lý luồng Apache Kafka.
Tham khảo
Creating a serverless Apache Kafka publisher using AWS Lambda | AWS Compute Blog (amazon.com)