Skip to main content

Overview

Groups the items emitted by an Observable according to a key computed for each item. Emits GroupedObservable objects, where each represents values belonging to the same group. Each GroupedObservable has a key property identifying its group.
groupBy is essential for categorizing streams of data, similar to SQL’s GROUP BY or JavaScript’s Map for categorization.

Type Signature

function groupBy<T, K>(
  key: (value: T) => K,
  options?: BasicGroupByOptions<K, T> | GroupByOptionsWithElement<K, E, T>
): OperatorFunction<T, GroupedObservable<K, T>>

interface BasicGroupByOptions<K, T> {
  element?: undefined;
  duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
  connector?: () => SubjectLike<T>;
}

interface GroupByOptionsWithElement<K, E, T> {
  element: (value: T) => E;
  duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
  connector?: () => SubjectLike<E>;
}

Parameters

key
(value: T) => K
required
A function that extracts the key for each item. Items with the same key will be grouped together.
options.element
(value: T) => E
Optional function to transform each value before adding it to the group. If not provided, the original values are used.
options.duration
(grouped: GroupedObservable<K, T>) => ObservableInput<any>
A function that returns an Observable to determine how long each group should exist. When this Observable emits, the group completes and is removed.
options.connector
() => SubjectLike<T>
Factory function to create an intermediate Subject through which grouped elements are emitted. Useful for custom Subject implementations.

Returns

return
OperatorFunction<T, GroupedObservable<K, T>>
A function that returns an Observable of GroupedObservable objects. Each GroupedObservable has a key property and emits items from that group.

Usage Examples

Basic Example: Group by Property

import { of, groupBy, mergeMap, reduce } from 'rxjs';

const items = of(
  { id: 1, name: 'JavaScript' },
  { id: 2, name: 'Parcel' },
  { id: 2, name: 'webpack' },
  { id: 1, name: 'TypeScript' },
  { id: 3, name: 'TSLint' }
);

items.pipe(
  groupBy(item => item.id),
  mergeMap(group$ => 
    group$.pipe(
      reduce((acc, cur) => [...acc, cur], [])
    )
  )
).subscribe(group => console.log(group));

// Output:
// [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript' }]
// [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack' }]
// [{ id: 3, name: 'TSLint' }]

Using the Element Selector

import { of, groupBy, mergeMap, reduce, map } from 'rxjs';

const items = of(
  { id: 1, name: 'JavaScript' },
  { id: 2, name: 'Parcel' },
  { id: 2, name: 'webpack' },
  { id: 1, name: 'TypeScript' },
  { id: 3, name: 'TSLint' }
);

items.pipe(
  groupBy(
    item => item.id,
    { element: item => item.name } // Only emit names
  ),
  mergeMap(group$ =>
    group$.pipe(
      reduce((acc, cur) => [...acc, cur], [`${group$.key}`]),
      map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))
    )
  )
).subscribe(group => console.log(group));

// Output:
// { id: 1, values: ['JavaScript', 'TypeScript'] }
// { id: 2, values: ['Parcel', 'webpack'] }
// { id: 3, values: ['TSLint'] }

Group Duration - Expiring Groups

import { interval, groupBy, mergeMap, map, take, timer } from 'rxjs';

interface Event {
  userId: number;
  action: string;
  timestamp: number;
}

const events = interval(500).pipe(
  take(20),
  map(i => ({
    userId: i % 3,
    action: `action${i}`,
    timestamp: Date.now()
  }))
);

events.pipe(
  groupBy(
    event => event.userId,
    {
      duration: group$ => timer(2000) // Groups expire after 2 seconds
    }
  ),
  mergeMap(group$ => {
    console.log(`Group ${group$.key} started`);
    return group$.pipe(
      toArray(),
      map(events => ({ userId: group$.key, count: events.length })),
      tap(() => console.log(`Group ${group$.key} completed`))
    );
  })
).subscribe(result => {
  console.log('Group result:', result);
});

Marble Diagram

Source:  --1a--2a--1b--3a--2b--1c--|
Key:       1   2   1   3   2   1

Group 1: --a-----b--------c-----|
Group 2: ------a--------b-------|
Group 3: ------------a-----------|

Result:  --G1--G2------G3--------|
         (GroupedObservables)

Common Use Cases

  1. User Session Tracking: Group events by user ID
  2. Log Analysis: Group log entries by severity, module, or timestamp
  3. Data Aggregation: Categorize and aggregate streaming data
  4. Event Categorization: Organize events by type or source
  5. Real-time Analytics: Group metrics by dimension
  6. Stream Partitioning: Split a stream into multiple sub-streams
Each GroupedObservable must be subscribed to, otherwise grouped values will be buffered indefinitely, leading to memory leaks. Always use operators like mergeMap or mergeAll to subscribe to groups.

Advanced Example: Real-time User Activity Dashboard

import { interval, groupBy, mergeMap, map, bufferTime, scan } from 'rxjs';

interface UserAction {
  userId: string;
  action: 'click' | 'scroll' | 'navigate';
  timestamp: number;
  metadata?: any;
}

const userActions = new Subject<UserAction>();

// Group by user and aggregate activity
const userActivityDashboard = userActions.pipe(
  groupBy(action => action.userId),
  mergeMap(userGroup$ => {
    const userId = userGroup$.key;
    
    return userGroup$.pipe(
      bufferTime(5000), // Aggregate every 5 seconds
      filter(actions => actions.length > 0),
      map(actions => ({
        userId,
        period: new Date().toISOString(),
        clickCount: actions.filter(a => a.action === 'click').length,
        scrollCount: actions.filter(a => a.action === 'scroll').length,
        navigateCount: actions.filter(a => a.action === 'navigate').length,
        totalActions: actions.length
      })),
      scan((acc, curr) => ({
        ...curr,
        lifetimeTotal: (acc.lifetimeTotal || 0) + curr.totalActions
      }), {} as any)
    );
  })
);

userActivityDashboard.subscribe(stats => {
  console.log('User activity update:', stats);
  updateDashboard(stats);
});

// Simulate events
userActions.next({ 
  userId: 'user1', 
  action: 'click', 
  timestamp: Date.now() 
});

Stream Processing by Category

import { interval, groupBy, mergeMap, map } from 'rxjs';

interface Message {
  priority: 'high' | 'medium' | 'low';
  content: string;
  id: number;
}

const messages = interval(100).pipe(
  take(50),
  map(i => ({
    id: i,
    priority: ['high', 'medium', 'low'][i % 3] as any,
    content: `Message ${i}`
  }))
);

messages.pipe(
  groupBy(msg => msg.priority),
  mergeMap(priorityGroup$ => {
    const priority = priorityGroup$.key;
    
    // Different processing based on priority
    if (priority === 'high') {
      // High priority: process immediately
      return priorityGroup$.pipe(
        map(msg => ({ ...msg, processedBy: 'immediate' }))
      );
    } else if (priority === 'medium') {
      // Medium priority: batch every 500ms
      return priorityGroup$.pipe(
        bufferTime(500),
        filter(batch => batch.length > 0),
        map(batch => ({ 
          priority, 
          processedBy: 'batched', 
          count: batch.length,
          messages: batch 
        }))
      );
    } else {
      // Low priority: sample every 1s
      return priorityGroup$.pipe(
        bufferTime(1000),
        filter(batch => batch.length > 0),
        map(batch => ({ 
          priority, 
          processedBy: 'sampled', 
          count: batch.length,
          latest: batch[batch.length - 1] 
        }))
      );
    }
  })
).subscribe(result => {
  console.log('Processed:', result);
});

Memory Management

Use the duration option to automatically complete and clean up groups after a certain time or condition. This prevents unbounded memory growth with long-lived streams.
import { groupBy, mergeMap, timer, takeUntil } from 'rxjs';

const stream = getDataStream();

stream.pipe(
  groupBy(
    item => item.category,
    {
      duration: group$ => timer(60000) // Groups auto-close after 1 minute
    }
  ),
  mergeMap(group$ => 
    group$.pipe(
      // Process group
      toArray(),
      map(items => ({ category: group$.key, items }))
    )
  )
).subscribe(result => {
  console.log('Group completed:', result);
});

Type Guards with groupBy

import { of, groupBy, mergeMap } from 'rxjs';

type Animal = 
  | { type: 'dog'; breed: string; bark: () => void }
  | { type: 'cat'; color: string; meow: () => void };

const animals: Animal[] = [
  { type: 'dog', breed: 'Labrador', bark: () => console.log('Woof!') },
  { type: 'cat', color: 'orange', meow: () => console.log('Meow!') },
  { type: 'dog', breed: 'Poodle', bark: () => console.log('Woof!') }
];

of(...animals).pipe(
  groupBy(
    (animal): animal is { type: 'dog'; breed: string; bark: () => void } => 
      animal.type === 'dog'
  ),
  mergeMap(group$ => {
    if (group$.key) {
      // TypeScript knows these are dogs
      return group$.pipe(
        map(dog => `Dog breed: ${dog.breed}`)
      );
    } else {
      // TypeScript knows these are cats
      return group$.pipe(
        map(cat => `Cat color: ${cat.color}`)
      );
    }
  })
).subscribe(console.log);
  • partition - Split stream into two based on a predicate
  • mergeMap - Often used with groupBy to flatten groups
  • reduce - Aggregate values within each group
  • bufferTime - Time-based windowing within groups
  • distinct - Remove duplicates (related concept)