The Hugging Face platform has many datasets and pre-trained models, making it increasingly accessible to use and train state-of-the-art machine learning models. However, AI tasks can be difficult to scale because AI datasets are often large (hundreds of GB to TB) and using Hugging Face transformers for model inference can be computationally expensive.
Dask, a Python library for distributed computing, can handle out-of-core computing (processing data that doesn’t fit in memory) by dividing datasets into manageable chunks. This makes it easy to do things like:
Efficient data loading and preprocessing of TB-scale datasets with an easy-to-use API that mimics pandas Parallel model inference (with option for multi-node GPU inference)
This post provides an example of data processing from the FineWeb dataset that uses the FineWeb-Edu classifier to identify web pages with high educational value. Shown below:
How to process 100 rows locally with pandas Scale to 211 million rows with Dask on multiple GPUs in the cloud
Process 100 rows with Pandas
The FineWeb dataset consists of 15 trillion tokens of English web data from Common Crawl, a nonprofit organization that hosts monthly updated public web crawl datasets. This dataset is often used for a variety of tasks, including large-scale language model training, classification, content filtering, and information retrieval across a variety of disciplines.
Downloading and reading a single file with Pandas on a laptop can take more than a minute.
import panda as pd df = pd.read_parquet(
“hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet”
)
We then use the HF FineWeb-Edu classifier to determine the educational value of the web pages in our dataset. Web pages are ranked on a scale of 0 to 5, with 0 being not educational and 5 being very educational. pandas allows you to do this for a small subset of 100 rows of data. This takes about 10 seconds on an M1 Mac with GPU.
from transformer import pipeline
surely calculation score(text):
import torch
if torch.cuda.is_available(): device = torch.device(“Cuda”)
Elif torch.backends.mps.is_available(): device = torch.device(“Member of Parliament”)
Other than that: device = torch.device(“CPU”) pipe = pipeline(
“Text classification”model =“HuggingFaceFW/fineweb-edu-classifier”device = device ) result = Pipe( texts.to_list(), batch_size=twenty fivepadding =“Longest”truncate =truththe function to apply =“none”
)
return pd.Series((r(“Score”) for r in result)) df = df(:100) min_edu_score = 3
df(“edu-classifier-score”) = compute_scores(df.text) df = df(df(“edu-classifier-score”) >= min_edu_score)
Note that I also added a step inside the compute_scores function to check for available hardware, as it will be distributed when scaling using Dask in the next step. This allows you to easily move from local testing on a single machine (on a CPU or a MacBook with an Apple Silicon GPU) to distributed testing across multiple machines (such as NVIDIA GPUs).
Scaled to 211 million rows with Dask
The entire February and March 2024 crawl will be 432 GB on disk, or approximately 715 GB in memory, divided into 250 Parquet files. Even on a machine with enough memory for the entire dataset, running this serially would be prohibitively slow.
To scale up, you can use Dask DataFrame, which helps you process large tabular data by parallelizing Pandas. This is very similar to the pandas API and allows you to easily test on a single dataset and scale out to a full dataset. Dask works well with Parquet, the default format for Hugging Face datasets, allowing for rich data types, efficient columnar filtering, and compression.
import dask.dataframe as dd df = dd.read_parquet(
“hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet”
)
Apply the compute_scores function for text classification in parallel to a Dask DataFrame using map_partitions. This will apply the function in parallel to each pandas DataFrame within the larger Dask DataFrame. The meta argument is specific to Dask and indicates the data structure (column names and data types) of the output.
from transformer import pipeline
surely calculation score(text):
import torch
if torch.cuda.is_available(): device = torch.device(“Cuda”)
Elif torch.backends.mps.is_available(): device = torch.device(“Member of Parliament”)
Other than that: device = torch.device(“CPU”) pipe = pipeline(
“Text classification”model =“HuggingFaceFW/fineweb-edu-classifier”device=device, ) results = Pipe( texts.to_list(),batch_size=768padding =“Longest”truncate =truththe function to apply =“none”,)
return pd.Series((r(“Score”) for r in result)) min_edu_score = 3
df(“edu-classifier-score”) = df.text.map_partitions(compute_scores, meta=pd.Series((0))) df = df(df(“edu-classifier-score”) >= min_edu_score)
Although I chose a batch_size that works well for this example, please note that you may need to customize this depending on the hardware, data, and model you are using in your own workflow (Batch Processing Pipeline ). .
Now that you have identified the rows of your dataset of interest, you can save the results for other downstream analyses. Dask DataFrame automatically supports distributed writes to Parquet. Hugging Face uses commits to track dataset changes and can write Dask DataFrame in parallel.
Repository ID = “/”
df.to_parquet(f”hf://dataset/{repository ID}”)
This creates one commit per file, so we recommend crushing the history after uploading.
from hug face hub import HfApi HfApi().super_squash_history(repo_id=repo_id, repo_type=“Dataset”)
Alternatively, you can use this custom function that uploads multiple files per commit.
Multi-GPU parallel model inference
There are various ways to deploy Dask on different hardware. We’ll use Coiled to deploy Dask on the cloud so we can spin up VMs as needed and clean them up when we’re done.
Cluster = coiled.Cluster( area =“us-east-1”n_workers=100spot policy =“Spot with Fallback”worker_vm_types=“g5.xlarge”worker_options={“n thread”: 1}, ) client =cluster.get_client()
Coiled handle under the hood:
Provisioning cloud VMs using GPU hardware. In this case, it’s a g5.xlarge instance on AWS. Set up the appropriate NVIDIA drivers, CUDA runtime, etc. Use package sync to automatically install the same packages on your cloud VMs as you have locally. This includes Python files in your working directory.
The workflow took about 5 hours to complete, with good GPU hardware utilization.
Putting it all together, the complete workflow looks like this:
import dask.dataframe as DD
from transformer import pipeline
from hug face hub import Huffapi
import OS
import Coiled Cluster = Coiled.Cluster( Area=“us-east-1”n_workers=100spot policy =“Spot with Fallback”worker_vm_types=“g5.xlarge”worker_options={“n thread”: 1}, ) client =cluster.get_client()cluster.send_private_envs( {“HF_token”: “”) df = dd.read_parquet(
“hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet”
)
surely calculation score(text):
import torch
if torch.cuda.is_available(): device = torch.device(“Cuda”)
Elif torch.backends.mps.is_available(): device = torch.device(“Member of Parliament”)
Other than that: device = torch.device(“CPU”) pipe = pipeline(
“Text classification”model =“HuggingFaceFW/fineweb-edu-classifier”device = device ) result = Pipe( texts.to_list(), batch_size=768padding =“Longest”truncate =truththe function to apply =“none”
)
return pd.Series((r(“Score”) for r in result)) min_edu_score = 3
df(“edu-classifier-score”) = df.text.map_partitions(compute_scores, meta=pd.Series((0))) df = df(df(“edu-classifier-score”) >= min_edu_score) repo_id = “/”
df.to_parquet(f”hf://dataset/{repository ID}”) HfApi().super_squash_history(repo_id=repo_id, repo_type=“Dataset”)
conclusion
Hugface + Dusk is a powerful combination. In this example, we used Dask + Cocked to scale a classification task from 100 rows to 211 million rows by running the workflow in parallel on multiple GPUs in the cloud.
This same type of workflow can also be used for other use cases, such as:
Filter genomic data to select genes of interest Extract information from unstructured text and transform it into structured datasets Clean text data collected from the Internet or common crawls Perform multimodal model inference to Analyze large audio, image, or video datasets