Tối ưu hóa quá trình xử lý dữ liệu ETL tại Talent.com với Amazon SageMaker

Bài đăng này được đồng tác giả bởi Anatoly Khomenko, Kỹ sư Học máy, và Abdenour Bezzouh, Giám đốc Công nghệ tại Talent.com.

Thành lập vào năm 2011, Talent.com tổng hợp các danh sách việc làm có trả tiền từ khách hàng của họ và các danh sách việc làm công cộng, và đã tạo ra một nền tảng thống nhất, dễ tìm kiếm. Phủ sóng hơn 30 triệu danh sách việc làm trên hơn 75 quốc gia và bao gồm nhiều ngôn ngữ, ngành nghề và kênh phân phối khác nhau, Talent.com đáp ứng nhu cầu đa dạng của người tìm việc, hiệu quả kết nối hàng triệu người tìm việc với cơ hội việc làm.

Sứ mệnh của Talent.com là tạo điều kiện cho việc kết nối lực lượng lao động toàn cầu. Để đạt được điều này, Talent.com tổng hợp các danh sách việc làm từ nhiều nguồn trên web, cung cấp cho người tìm việc truy cập vào một bể việc làm rộng lớn với hơn 30 triệu cơ hội việc làm phù hợp với kỹ năng và kinh nghiệm của họ. Phù hợp với sứ mệnh này, Talent.com đã hợp tác với AWS để phát triển một công cụ đề xuất việc làm tiên tiến được điều khiển bởi học sâu, nhằm hỗ trợ người dùng trong việc thăng tiến sự nghiệp.

Để đảm bảo hoạt động hiệu quả của công cụ đề xuất việc làm này, việc triển khai một ống dẫn xử lý dữ liệu quy mô lớn chịu trách nhiệm trích xuất và tinh chỉnh các tính năng từ danh sách việc làm tổng hợp của Talent.com là điều cần thiết. Ống dẫn này có khả năng xử lý 5 triệu bản ghi hàng ngày trong chưa đầy 1 giờ, và cho phép xử lý nhiều ngày bản ghi một cách song song. Ngoài ra, giải pháp này cho phép triển khai nhanh chóng đến sản xuất. Nguồn dữ liệu chính cho ống dẫn này là định dạng JSON Lines, được lưu trữ trong Amazon Simple Storage Service (Amazon S3) và phân chia theo ngày. Mỗi ngày, điều này dẫn đến việc tạo ra hàng chục nghìn tệp JSON Lines, với các cập nhật tăng dần hàng ngày.

Mục tiêu chính của ống dẫn xử lý dữ liệu này là tạo điều kiện cho việc tạo ra các tính năng cần thiết cho việc đào tạo và triển khai công cụ đề xuất việc làm trên Talent.com. Đáng chú ý là ống dẫn này phải hỗ trợ cập nhật tăng dần và đáp ứng yêu cầu trích xuất tính năng phức tạp cần thiết cho các mô-đun đào tạo và triển khai quan trọng cho hệ thống đề xuất việc làm. Ống dẫn của chúng tôi thuộc về gia đình quy trình ETL (trích xuất, biến đổi và tải) tổng hợp dữ liệu từ nhiều nguồn vào một kho lưu trữ lớn, trung tâm.

Để biết thêm thông tin chi tiết về cách Talent.com và AWS cùng nhau xây dựng các kỹ thuật đào tạo mô hình xử lý ngôn ngữ tự nhiên và học sâu tiên tiến, sử dụng Amazon SageMaker để tạo ra một hệ thống đề xuất việc làm, hãy tham khảo từ văn bản đến công việc mơ ước: Xây dựng một công cụ đề xuất việc làm dựa trên NLP tại Talent.com với Amazon SageMaker. Hệ thống bao gồm kỹ thuật kỹ sư tính năng, thiết kế kiến trúc mô hình học sâu, tối ưu hóa tham số siêu hình và đánh giá mô hình, nơi tất cả các mô-đun đều được chạy bằng Python.

Bài đăng này cho thấy cách chúng tôi sử dụng SageMaker để xây dựng một ống dẫn xử lý dữ liệu quy mô lớn để chuẩn bị các tính năng cho công cụ đề xuất việc làm tại Talent.com. Giải pháp kết quả cho phép một Nhà khoa học dữ liệu tưởng tượng trích xuất tính năng trong một sổ ghi chép SageMaker sử dụng thư viện Python, như Scikit-Learn hoặc PyTorch, và sau đó nhanh chóng triển khai cùng một mã vào ống dẫn xử lý dữ liệu thực hiện trích xuất tính năng ở quy mô. Giải pháp không yêu cầu chuyển mã trích xuất tính năng sang sử dụng PySpark, như yêu cầu khi sử dụng AWS Glue làm giải pháp ETL. Giải pháp của chúng tôi có thể được phát triển và triển khai hoàn toàn bởi một Nhà khoa học dữ liệu từ đầu đến cuối chỉ sử dụng SageMaker, và không yêu cầu kiến thức về các giải pháp ETL khác, như AWS Batch. Điều này có thể rút ngắn đáng kể thời gian cần thiết để triển khai ống dẫn Học máy (ML) vào sản xuất. Ống dẫn được vận hành thông qua Python và tích hợp mượt mà với quy trình làm việc trích xuất tính năng, làm cho nó linh hoạt với nhiều ứng dụng phân tích dữ liệu.

Xây dựng Pipeline Sử Dụng AWS: Từ Dữ Liệu Thô đến Mô Hình Khuyến Nghị Công Việc

Pipeline này bao gồm ba giai đoạn chính:

  • Sử dụng Công Việc Xử Lý của Amazon SageMaker để Xử Lý Tệp JSONL Thô: Công việc này liên quan đến việc xử lý tệp JSONL thô tương ứng với một ngày cụ thể. Đối với nhiều ngày dữ liệu, có thể xử lý đồng thời bằng các Công Việc Xử Lý riêng biệt.
  • Sử Dụng AWS Glue để Crawl Dữ Liệu Sau Khi Xử Lý Nhiều Ngày: Sau khi xử lý dữ liệu của nhiều ngày, chúng ta sử dụng AWS Glue để thực hiện việc crawl dữ liệu.
  • Tải Các Tính Năng Đã Xử Lý Cho Khoảng Thời Gian Cụ Thể Bằng SQL từ Bảng Amazon Athena: Sau đó, chúng ta sẽ huấn luyện và triển khai mô hình khuyến nghị công việc.

Xử Lý Tệp JSONL Thô

Chúng tôi xử lý tệp JSONL thô cho một ngày cụ thể sử dụng Công Việc Xử Lý của SageMaker. Công việc này thực hiện việc trích xuất tính năng và nén dữ liệu, sau đó lưu các tính năng đã xử lý vào tệp Parquet với 1 triệu bản ghi trên mỗi tệp. Chúng tôi tận dụng khả năng xử lý song song trên CPU để thực hiện việc trích xuất tính năng cho từng tệp JSONL thô một cách đồng thời. Kết quả xử lý của mỗi tệp JSONL được lưu vào một tệp Parquet riêng biệt trong thư mục tạm thời. Sau khi tất cả các tệp JSONL đã được xử lý, chúng tôi thực hiện nén hàng ngàn tệp Parquet nhỏ vào một số tệp với 1 triệu bản ghi trên mỗi tệp. Các tệp Parquet đã nén sau đó được tải lên Amazon S3 như là kết quả đầu ra của công việc xử lý. Việc nén dữ liệu đảm bảo việc crawl và truy vấn SQL hiệu quả trong các giai đoạn tiếp theo của pipeline.

Dưới đây là mã mẫu để lập lịch Công Việc Xử Lý SageMaker cho một ngày cụ thể, ví dụ 2020-01-01, sử dụng SDK của SageMaker. Công việc đọc tệp JSONL thô từ Amazon S3 (ví dụ từ s3://bucket/raw-data/2020/01/01) và lưu các tệp Parquet đã nén vào Amazon S3 (ví dụ tới s3://bucket/processed/table-name/day_partition=2020-01-01/).

### install dependencies 

%pip install sagemaker pyarrow s3fs awswrangler

import sagemaker

import boto3

from sagemaker.processing import FrameworkProcessor

from sagemaker.sklearn.estimator import SKLearn

from sagemaker import get_execution_role

from sagemaker.processing import ProcessingInput, ProcessingOutput

region = boto3.session.Session().region_name

role = get_execution_role()

bucket = sagemaker.Session().default_bucket()

### we use instance with 16 CPUs and 128 GiB memory

### note that the script will NOT load the entire data into memory during compaction

### depending on the size of individual jsonl files, larger instance may be needed

instance = “ml.r5.4xlarge”

n_jobs = 8  ### we use 8 process workers

date = “2020-01-01” ### process data for one day

est_cls = SKLearn

framework_version_str = “0.20.0”

### schedule processing job

script_processor = FrameworkProcessor(

    role=role,

    instance_count=1,

    instance_type=instance,

    estimator_cls=est_cls,

    framework_version=framework_version_str,

    volume_size_in_gb=500,

)

script_processor.run(

    code=”processing_script.py”, ### name of the main processing script

    source_dir=”../src/etl/”, ### location of source code directory

    ### our processing script loads raw jsonl files directly from S3

    ### this avoids long start-up times of the processing jobs,

    ### since raw data does not need to be copied into instance

    inputs=[], ### processing job input is empty

    outputs=[

        ProcessingOutput(destination=”s3://bucket/processed/table-name/”,

                         source=”/opt/ml/processing/output”),

    ],

    arguments=[

        ### directory with job’s output

        “–output”, “/opt/ml/processing/output”,

        ### temporary directory inside instance

        “–tmp_output”, “/opt/ml/tmp_output”,

        “–n_jobs”, str(n_jobs), ### number of process workers

        “–date”, date, ### date to process

        ### location with raw jsonl files in S3

        “–path”, “s3://bucket/raw-data/”,

    ],

    wait=False

)

Chúng tôi xin giới thiệu đoạn mã nguồn cơ bản cho script chính (processing_script.py) mà chạy công việc Xử lý của SageMaker như sau:

import concurrent

import pyarrow.dataset as ds

import os

import s3fs

from pathlib import Path

### function to process raw jsonl file and save extracted features into parquet file  

from process_data import process_jsonl

### parse command line arguments

args = parse_args()

### we use s3fs to crawl S3 input path for raw jsonl files

fs = s3fs.S3FileSystem()

### we assume raw jsonl files are stored in S3 directories partitioned by date

### for example: s3://bucket/raw-data/2020/01/01/

jsons = fs.find(os.path.join(args.path, *args.date.split(‘-‘)))

### temporary directory location inside the Processing job instance

tmp_out = os.path.join(args.tmp_output, f”day_partition={args.date}”)

### directory location with job’s output

out_dir = os.path.join(args.output, f”day_partition={args.date}”)

### process individual jsonl files in parallel using n_jobs process workers

futures=[]

with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor:

    for file in jsons:

        inp_file = Path(file)

        out_file = os.path.join(tmp_out, inp_file.stem + “.snappy.parquet”)

        ### process_jsonl function reads raw jsonl file from S3 location (inp_file)

        ### and saves result into parquet file (out_file) inside temporary directory

        futures.append(executor.submit(process_jsonl, file, out_file))

    ### wait until all jsonl files are processed

    for future in concurrent.futures.as_completed(futures):

        result = future.result()

### compact parquet files

dataset = ds.dataset(tmp_out)

if len(dataset.schema) > 0:

    ### save compacted parquet files with 1MM records per file

    ds.write_dataset(dataset, out_dir, format=”parquet”, 

                     max_rows_per_file=1024 * 1024)

Tính mở rộng là tính năng then chốt của quy trình làm việc của chúng tôi. Đầu tiên, chúng tôi có thể sử dụng nhiều công việc Xử lý SageMaker để xử lý dữ liệu trong nhiều ngày cùng một lúc. Thứ hai, chúng tôi tránh việc tải toàn bộ dữ liệu đã xử lý hoặc dữ liệu thô vào bộ nhớ cùng một lúc, trong khi xử lý từng ngày dữ liệu được chỉ định. Điều này cho phép xử lý dữ liệu sử dụng các loại thực thể không thể chứa đủ dữ liệu trong một ngày trong bộ nhớ chính. Yêu cầu duy nhất là loại thực thể phải có khả năng tải đồng thời N tệp JSONL thô hoặc tệp Parquet đã xử lý vào bộ nhớ, với N là số lượng công nhân xử lý đang sử dụng.

Cào dữ liệu đã xử lý sử dụng AWS Glue

Sau khi toàn bộ dữ liệu thô cho nhiều ngày đã được xử lý, chúng tôi có thể tạo một bảng Athena từ toàn bộ bộ dữ liệu bằng cách sử dụng một trình cào AWS Glue. Chúng tôi sử dụng thư viện AWS SDK cho pandas (awswrangler) để tạo bảng sử dụng đoạn mã sau:

import awswrangler as wr

### crawl processed data in S3

res = wr.s3.store_parquet_metadata(

    path=’s3://bucket/processed/table-name/’,

    database=”database_name”,

    table=”table_name”,

    dataset=True,

    mode=”overwrite”,

    sampling=1.0,

    path_suffix=’.parquet’,

)

### print table schema

print(res[0])

Bây giờ, các tính năng đã được xử lý cho một khoảng thời gian cụ thể có thể được tải từ bảng Athena sử dụng SQL, và những tính năng này sau đó có thể được sử dụng để huấn luyện mô hình đề xuất công việc. Ví dụ, đoạn mã sau đây tải một tháng các tính năng đã xử lý vào một DataFrame sử dụng thư viện awswrangler:

import awswrangler as wr

query = “””

    SELECT * 

    FROM table_name

    WHERE day_partition BETWEN ‘2020-01-01’ AND ‘2020-02-01’ 

“””

### load 1 month of data from database_name.table_name into a DataFrame

df = wr.athena.read_sql_query(query, database=’database_name’)

Sử Dụng SQL để Tải Tính Năng Đã Xử Lý Cho Việc Huấn Luyện và Ứng Dụng Rộng Rãi Khác

Thêm vào đó, việc sử dụng SQL để tải các tính năng đã xử lý cho việc huấn luyện có thể được mở rộng để phù hợp với nhiều trường hợp sử dụng khác. Chẳng hạn, chúng ta có thể áp dụng một pipeline tương tự để duy trì hai bảng Athena riêng biệt: một để lưu trữ ấn tượng người dùng và một khác để lưu trữ lượt nhấp của người dùng vào những ấn tượng này. Sử dụng các câu lệnh SQL join, chúng ta có thể truy xuất những ấn tượng mà người dùng đã nhấp vào hoặc không và sau đó truyền những ấn tượng này vào công việc huấn luyện mô hình.

Lợi Ích Của Giải Pháp

Việc triển khai giải pháp đề xuất mang lại nhiều lợi ích cho quy trình làm việc hiện tại của chúng ta, bao gồm:

  • Triển khai Đơn Giản – Giải pháp này cho phép việc trích xuất tính năng được thực hiện bằng Python sử dụng các thư viện ML phổ biến. Và nó không yêu cầu mã được chuyển sang PySpark. Điều này giúp việc trích xuất tính năng trở nên đơn giản hơn vì cùng một mã được một Nhà khoa học dữ liệu phát triển trong sổ ghi chép sẽ được thực thi bởi pipeline này.
  • Đường Đi Nhanh Chóng Đến Sản Xuất – Giải pháp có thể được phát triển và triển khai bởi một Nhà khoa học dữ liệu để thực hiện việc trích xuất tính năng ở quy mô lớn, cho phép họ phát triển một mô hình ML đề xuất dựa trên dữ liệu này. Đồng thời, cùng một giải pháp có thể được triển khai vào sản xuất bởi một Kỹ sư ML với ít chỉnh sửa cần thiết.
  • Tính Khả Dụng Lại – Giải pháp cung cấp một mô hình có thể tái sử dụng cho việc trích xuất tính năng ở quy mô lớn, và có thể dễ dàng thích ứng cho các trường hợp sử dụng khác ngoài việc xây dựng các mô hình đề xuất.
  • Hiệu Quả – Giải pháp cung cấp hiệu suất tốt: xử lý một ngày dữ liệu của Talent.com mất chưa đến 1 giờ.
  • Cập Nhật Tăng Dần – Giải pháp cũng hỗ trợ cập nhật tăng dần. Dữ liệu hàng ngày mới có thể được xử lý với một công việc Processing của SageMaker, và vị trí S3 chứa dữ liệu đã xử lý có thể được tái duyệt để cập nhật bảng Athena. Chúng ta cũng có thể sử dụng một công việc cron để cập nhật dữ liệu hôm nay nhiều lần mỗi ngày (ví dụ, mỗi 3 giờ).

Chúng tôi đã sử dụng pipeline ETL này để giúp Talent.com xử lý 50,000 tệp mỗi ngày chứa 5 triệu bản ghi, và tạo ra dữ liệu huấn luyện bằng cách sử dụng các tính năng được trích xuất từ 90 ngày dữ liệu thô từ Talent.com – tổng cộng 450 triệu bản ghi trên 900,000 tệp. Pipeline của chúng tôi đã giúp Talent.com xây dựng và triển khai hệ thống đề xuất vào sản xuất chỉ trong 2 tuần. Giải pháp thực hiện tất cả các quá trình ML bao gồm ETL trên Amazon SageMaker mà không sử dụng dịch vụ AWS khác. Hệ thống đề xuất việc làm đã tăng tỷ lệ nhấp qua 8.6% trong thử nghiệm A/B trực tuyến so với giải pháp dựa trên XGBoost trước đó, giúp kết nối hàng triệu người dùng của Talent.com với những công việc tốt hơn.

Kết luận

Bài viết này trình bày về pipeline ETL mà chúng tôi đã phát triển để xử lý đặc trưng cho việc huấn luyện và triển khai mô hình gợi ý việc làm tại Talent.com. Pipeline của chúng tôi sử dụng các công việc Xử lý SageMaker để thực hiện xử lý dữ liệu và trích xuất đặc trưng một cách hiệu quả với quy mô lớn. Mã trích xuất đặc trưng được thực hiện bằng Python, cho phép sử dụng các thư viện ML phổ biến để thực hiện việc trích xuất đặc trưng ở quy mô lớn, mà không cần phải chuyển mã sang sử dụng PySpark.

Chúng tôi khuyến khích các độc giả khám phá khả năng sử dụng pipeline được trình bày trong bài viết này như một mẫu cho các trường hợp sử dụng của họ, nơi mà việc trích xuất đặc trưng ở quy mô lớn là cần thiết. Pipeline có thể được tận dụng bởi một Nhà Khoa học Dữ liệu để xây dựng một mô hình ML, và cùng một pipeline sau đó có thể được áp dụng bởi một Kỹ sư ML để chạy trong môi trường sản xuất. Điều này có thể giảm đáng kể thời gian cần thiết để sản phẩm hóa giải pháp ML từ đầu đến cuối, như trường hợp của Talent.com. Các độc giả có thể tham khảo hướng dẫn để thiết lập và chạy các công việc Xử lý SageMaker. Chúng tôi cũng giới thiệu các độc giả xem bài viết Từ văn bản đến công việc mơ ước: Xây dựng hệ thống gợi ý việc làm dựa trên NLP tại Talent.com với Amazon SageMaker, nơi chúng tôi thảo luận về các kỹ thuật huấn luyện mô hình học sâu sử dụng Amazon SageMaker để xây dựng hệ thống gợi ý việc làm của Talent.com.