Overview
MixtureParquetAsrDataset combines parquet storage and ASR task pipelines into a complete fairseq2 dataset implementation. It supports:
- Multilingual datasets with language-corpus partitioning
- Weighted sampling across partitions for balanced training
- Efficient parquet-based storage with streaming support
- Configurable audio/text preprocessing pipelines
Constructor
MixtureParquetAsrDataset(path: Path)
Path to the dataset directory containing parquet files organized by language and corpus partitions.
Example
from pathlib import Path
from omnilingual_asr.datasets.impl import MixtureParquetAsrDataset
# Initialize dataset
dataset = MixtureParquetAsrDataset.from_path(Path("data/asr_dataset"))
Class Methods
from_path
@classmethod
from_path(cls, path: Path) -> MixtureParquetAsrDataset
Factory method to create dataset from a directory path.
Path to parquet dataset directory.
Initialized dataset instance.
Instance Methods
create_reader
def create_reader(
split: str,
tokenizer: Tokenizer,
gangs: Gangs,
dtype: torch.dtype,
num_accumulate: int,
storage_config: MixtureParquetStorageConfig,
task_config: AsrTaskConfig,
) -> DataReader[Seq2SeqBatch]
Creates a data reader for the specified split with full configuration.
Dataset split to read (e.g., "train", "dev", "test"). Can include corpus filter as "split_corpus" (e.g., "train_librispeech").
Tokenizer for text encoding/decoding.
Gang configuration for distributed data parallel training.
Data type for audio tensors.
Number of batches to accumulate before yielding.
storage_config
MixtureParquetStorageConfig
required
Configuration for parquet storage and partition weighting.
Configuration for ASR preprocessing pipeline.
Configured data reader yielding Seq2SeqBatch objects.
Complete Usage Example
from pathlib import Path
import torch
from fairseq2.data.tokenizers import load_tokenizer
from fairseq2.gang import create_fake_gangs
from fairseq2.datasets import SyncMode
from fairseq2.data.parquet import (
FragmentStreamingConfig,
FragmentLoadingConfig
)
from omnilingual_asr.datasets.impl import MixtureParquetAsrDataset
from omnilingual_asr.datasets.storage import (
MixtureParquetStorageConfig,
LangASRSchema
)
from omnilingual_asr.datasets.tasks import AsrTaskConfig
# 1. Initialize dataset
dataset = MixtureParquetAsrDataset.from_path(
Path("data/multilingual_asr")
)
# 2. Load tokenizer
tokenizer = load_tokenizer("omniASR_tokenizer_v1")
# 3. Configure storage with mixture weighting
storage_config = MixtureParquetStorageConfig(
fragment_streaming=FragmentStreamingConfig(
parquet_path="", # Set automatically
seed=42,
fragment_shuffle_window=-1, # Global shuffle
nb_epochs=None # Infinite loop for training
),
fragment_loading=FragmentLoadingConfig(
columns=LangASRSchema(),
nb_prefetch=1,
num_parallel_fragments=1
),
# Partition weighting
dataset_summary_path="data/dataset_stats.tsv",
beta_corpus=0.5, # Corpus weight = (hours/total)^0.5
beta_language=0.5, # Language weight within corpus
sync_mode=SyncMode.UNTIL_FIRST,
sync_batches=True
)
# 4. Configure ASR task
task_config = AsrTaskConfig(
min_audio_len=8000, # ~0.5s at 16kHz
max_audio_len=800_000, # ~50s at 16kHz
max_num_elements=1_600_000,
num_seqs_multiple_of=8,
normalize_audio=True,
example_shuffle_window=1000,
batch_shuffle_window=100,
num_prefetch=4,
seed=2025
)
# 5. Create reader
reader = dataset.create_reader(
split="train",
tokenizer=tokenizer,
gangs=create_fake_gangs(device=torch.device("cuda")),
dtype=torch.bfloat16,
num_accumulate=1,
storage_config=storage_config,
task_config=task_config
)
# 6. Iterate over batches
for batches in reader:
for batch in batches:
# batch is Seq2SeqBatch
audio = batch.source_seqs # [B, T]
text = batch.target_seqs # [B, S]
# Access metadata
languages = batch.example["lang"] # List[str]
corpora = batch.example["corpus"] # List[str]
# Training step...
break
Dataset Structure
The parquet dataset should be organized with language and corpus partitions:
data/multilingual_asr/
├── language=eng/
│ ├── corpus=librispeech/
│ │ ├── split=train/
│ │ │ └── data.parquet
│ │ └── split=dev/
│ │ └── data.parquet
│ └── corpus=commonvoice/
│ └── split=train/
│ └── data.parquet
└── language=fra/
└── corpus=commonvoice/
└── split=train/
└── data.parquet
Parquet Schema
Each parquet file should contain:
Raw audio data (WAV, MP3, FLAC, etc.)
Split name (train/dev/test)
Language code (e.g., “eng”, “fra”)
Corpus name (e.g., “librispeech”, “commonvoice”)
Mixture Weighting
The dataset_summary_path TSV should contain:
corpus language hours
librispeech eng 960.5
commonvoice eng 1500.2
commonvoice fra 450.8
Weight Calculation
# Corpus weight
corpus_weight = (corpus_hours / total_hours) ** beta_corpus
# Language weight within corpus
lang_weight = (lang_hours_in_corpus / corpus_hours) ** beta_language
# Final sample weight
sample_weight = corpus_weight * lang_weight
Beta parameter effects:
beta=1.0: Proportional to hours (high-resource languages dominate)
beta=0.5: Square root weighting (balanced)
beta=0.0: Uniform sampling (all languages equal)
Split Filtering
# Read specific corpus from train split
reader = dataset.create_reader(
split="train_librispeech", # Format: split_corpus
# ... other config
)
# Read all train data
reader = dataset.create_reader(
split="train",
# ... other config
)
The reader yields Seq2SeqBatch objects:
@dataclass
class Seq2SeqBatch:
source_seqs: Tensor # [batch_size, max_audio_len]
source_seq_lens: List[int] # Actual audio lengths
target_seqs: Tensor # [batch_size, max_text_len]
target_seq_lens: List[int] # Actual text lengths
example: Dict[str, Any] # Metadata including lang, corpus
See Also
Source Reference
See implementation at src/omnilingual_asr/datasets/impl/mixture_parquet_asr_dataset.py:33