Skip to main content

Overview

MixtureParquetAsrDataset combines parquet storage and ASR task pipelines into a complete fairseq2 dataset implementation. It supports:
  • Multilingual datasets with language-corpus partitioning
  • Weighted sampling across partitions for balanced training
  • Efficient parquet-based storage with streaming support
  • Configurable audio/text preprocessing pipelines

Constructor

MixtureParquetAsrDataset(path: Path)
path
Path
required
Path to the dataset directory containing parquet files organized by language and corpus partitions.

Example

from pathlib import Path
from omnilingual_asr.datasets.impl import MixtureParquetAsrDataset

# Initialize dataset
dataset = MixtureParquetAsrDataset.from_path(Path("data/asr_dataset"))

Class Methods

from_path

@classmethod
from_path(cls, path: Path) -> MixtureParquetAsrDataset
Factory method to create dataset from a directory path.
path
Path
required
Path to parquet dataset directory.
dataset
MixtureParquetAsrDataset
Initialized dataset instance.

Instance Methods

create_reader

def create_reader(
    split: str,
    tokenizer: Tokenizer,
    gangs: Gangs,
    dtype: torch.dtype,
    num_accumulate: int,
    storage_config: MixtureParquetStorageConfig,
    task_config: AsrTaskConfig,
) -> DataReader[Seq2SeqBatch]
Creates a data reader for the specified split with full configuration.
split
str
required
Dataset split to read (e.g., "train", "dev", "test"). Can include corpus filter as "split_corpus" (e.g., "train_librispeech").
tokenizer
Tokenizer
required
Tokenizer for text encoding/decoding.
gangs
Gangs
required
Gang configuration for distributed data parallel training.
dtype
torch.dtype
required
Data type for audio tensors.
num_accumulate
int
required
Number of batches to accumulate before yielding.
storage_config
MixtureParquetStorageConfig
required
Configuration for parquet storage and partition weighting.
task_config
AsrTaskConfig
required
Configuration for ASR preprocessing pipeline.
reader
DataReader[Seq2SeqBatch]
Configured data reader yielding Seq2SeqBatch objects.

Complete Usage Example

from pathlib import Path
import torch
from fairseq2.data.tokenizers import load_tokenizer
from fairseq2.gang import create_fake_gangs
from fairseq2.datasets import SyncMode
from fairseq2.data.parquet import (
    FragmentStreamingConfig,
    FragmentLoadingConfig
)

from omnilingual_asr.datasets.impl import MixtureParquetAsrDataset
from omnilingual_asr.datasets.storage import (
    MixtureParquetStorageConfig,
    LangASRSchema
)
from omnilingual_asr.datasets.tasks import AsrTaskConfig

# 1. Initialize dataset
dataset = MixtureParquetAsrDataset.from_path(
    Path("data/multilingual_asr")
)

# 2. Load tokenizer
tokenizer = load_tokenizer("omniASR_tokenizer_v1")

# 3. Configure storage with mixture weighting
storage_config = MixtureParquetStorageConfig(
    fragment_streaming=FragmentStreamingConfig(
        parquet_path="",  # Set automatically
        seed=42,
        fragment_shuffle_window=-1,  # Global shuffle
        nb_epochs=None  # Infinite loop for training
    ),
    fragment_loading=FragmentLoadingConfig(
        columns=LangASRSchema(),
        nb_prefetch=1,
        num_parallel_fragments=1
    ),
    # Partition weighting
    dataset_summary_path="data/dataset_stats.tsv",
    beta_corpus=0.5,  # Corpus weight = (hours/total)^0.5
    beta_language=0.5,  # Language weight within corpus
    sync_mode=SyncMode.UNTIL_FIRST,
    sync_batches=True
)

# 4. Configure ASR task
task_config = AsrTaskConfig(
    min_audio_len=8000,  # ~0.5s at 16kHz
    max_audio_len=800_000,  # ~50s at 16kHz
    max_num_elements=1_600_000,
    num_seqs_multiple_of=8,
    normalize_audio=True,
    example_shuffle_window=1000,
    batch_shuffle_window=100,
    num_prefetch=4,
    seed=2025
)

# 5. Create reader
reader = dataset.create_reader(
    split="train",
    tokenizer=tokenizer,
    gangs=create_fake_gangs(device=torch.device("cuda")),
    dtype=torch.bfloat16,
    num_accumulate=1,
    storage_config=storage_config,
    task_config=task_config
)

# 6. Iterate over batches
for batches in reader:
    for batch in batches:
        # batch is Seq2SeqBatch
        audio = batch.source_seqs  # [B, T]
        text = batch.target_seqs   # [B, S]
        
        # Access metadata
        languages = batch.example["lang"]  # List[str]
        corpora = batch.example["corpus"]  # List[str]
        
        # Training step...
        break

Dataset Structure

The parquet dataset should be organized with language and corpus partitions:
data/multilingual_asr/
├── language=eng/
│   ├── corpus=librispeech/
│   │   ├── split=train/
│   │   │   └── data.parquet
│   │   └── split=dev/
│   │       └── data.parquet
│   └── corpus=commonvoice/
│       └── split=train/
│           └── data.parquet
└── language=fra/
    └── corpus=commonvoice/
        └── split=train/
            └── data.parquet

Parquet Schema

Each parquet file should contain:
audio_bytes
bytes
required
Raw audio data (WAV, MP3, FLAC, etc.)
audio_size
int
required
Audio length in samples
text
str
required
Transcription text
split
str
required
Split name (train/dev/test)
language
str
required
Language code (e.g., “eng”, “fra”)
corpus
str
required
Corpus name (e.g., “librispeech”, “commonvoice”)

Mixture Weighting

Dataset Summary Format

The dataset_summary_path TSV should contain:
corpus	language	hours
librispeech	eng	960.5
commonvoice	eng	1500.2
commonvoice	fra	450.8

Weight Calculation

# Corpus weight
corpus_weight = (corpus_hours / total_hours) ** beta_corpus

# Language weight within corpus
lang_weight = (lang_hours_in_corpus / corpus_hours) ** beta_language

# Final sample weight
sample_weight = corpus_weight * lang_weight
Beta parameter effects:
  • beta=1.0: Proportional to hours (high-resource languages dominate)
  • beta=0.5: Square root weighting (balanced)
  • beta=0.0: Uniform sampling (all languages equal)

Split Filtering

# Read specific corpus from train split
reader = dataset.create_reader(
    split="train_librispeech",  # Format: split_corpus
    # ... other config
)

# Read all train data
reader = dataset.create_reader(
    split="train",
    # ... other config
)

Batch Format

The reader yields Seq2SeqBatch objects:
@dataclass
class Seq2SeqBatch:
    source_seqs: Tensor  # [batch_size, max_audio_len]
    source_seq_lens: List[int]  # Actual audio lengths
    target_seqs: Tensor  # [batch_size, max_text_len]
    target_seq_lens: List[int]  # Actual text lengths
    example: Dict[str, Any]  # Metadata including lang, corpus

See Also

Source Reference

See implementation at src/omnilingual_asr/datasets/impl/mixture_parquet_asr_dataset.py:33

Build docs developers (and LLMs) love