Overview
Groups the items emitted by an Observable according to a key computed for each item. Emits GroupedObservable objects, where each represents values belonging to the same group. Each GroupedObservable has a key property identifying its group.
groupBy is essential for categorizing streams of data, similar to SQL’s GROUP BY or JavaScript’s Map for categorization.
Type Signature
function groupBy < T , K >(
key : ( value : T ) => K ,
options ?: BasicGroupByOptions < K , T > | GroupByOptionsWithElement < K , E , T >
) : OperatorFunction < T , GroupedObservable < K , T >>
interface BasicGroupByOptions < K , T > {
element ?: undefined ;
duration ?: ( grouped : GroupedObservable < K , T >) => ObservableInput < any >;
connector ?: () => SubjectLike < T >;
}
interface GroupByOptionsWithElement < K , E , T > {
element : ( value : T ) => E ;
duration ?: ( grouped : GroupedObservable < K , E >) => ObservableInput < any >;
connector ?: () => SubjectLike < E >;
}
Parameters
A function that extracts the key for each item. Items with the same key will be grouped together.
Optional function to transform each value before adding it to the group. If not provided, the original values are used.
options.duration
(grouped: GroupedObservable<K, T>) => ObservableInput<any>
A function that returns an Observable to determine how long each group should exist. When this Observable emits, the group completes and is removed.
Factory function to create an intermediate Subject through which grouped elements are emitted. Useful for custom Subject implementations.
Returns
return
OperatorFunction<T, GroupedObservable<K, T>>
A function that returns an Observable of GroupedObservable objects. Each GroupedObservable has a key property and emits items from that group.
Usage Examples
Basic Example: Group by Property
Simple Grouping
Accessing Group Keys
import { of , groupBy , mergeMap , reduce } from 'rxjs' ;
const items = of (
{ id: 1 , name: 'JavaScript' },
{ id: 2 , name: 'Parcel' },
{ id: 2 , name: 'webpack' },
{ id: 1 , name: 'TypeScript' },
{ id: 3 , name: 'TSLint' }
);
items . pipe (
groupBy ( item => item . id ),
mergeMap ( group$ =>
group$ . pipe (
reduce (( acc , cur ) => [ ... acc , cur ], [])
)
)
). subscribe ( group => console . log ( group ));
// Output:
// [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript' }]
// [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack' }]
// [{ id: 3, name: 'TSLint' }]
Using the Element Selector
import { of , groupBy , mergeMap , reduce , map } from 'rxjs' ;
const items = of (
{ id: 1 , name: 'JavaScript' },
{ id: 2 , name: 'Parcel' },
{ id: 2 , name: 'webpack' },
{ id: 1 , name: 'TypeScript' },
{ id: 3 , name: 'TSLint' }
);
items . pipe (
groupBy (
item => item . id ,
{ element : item => item . name } // Only emit names
),
mergeMap ( group$ =>
group$ . pipe (
reduce (( acc , cur ) => [ ... acc , cur ], [ ` ${ group$ . key } ` ]),
map ( arr => ({ id: parseInt ( arr [ 0 ], 10 ), values: arr . slice ( 1 ) }))
)
)
). subscribe ( group => console . log ( group ));
// Output:
// { id: 1, values: ['JavaScript', 'TypeScript'] }
// { id: 2, values: ['Parcel', 'webpack'] }
// { id: 3, values: ['TSLint'] }
Group Duration - Expiring Groups
import { interval , groupBy , mergeMap , map , take , timer } from 'rxjs' ;
interface Event {
userId : number ;
action : string ;
timestamp : number ;
}
const events = interval ( 500 ). pipe (
take ( 20 ),
map ( i => ({
userId: i % 3 ,
action: `action ${ i } ` ,
timestamp: Date . now ()
}))
);
events . pipe (
groupBy (
event => event . userId ,
{
duration : group$ => timer ( 2000 ) // Groups expire after 2 seconds
}
),
mergeMap ( group$ => {
console . log ( `Group ${ group$ . key } started` );
return group$ . pipe (
toArray (),
map ( events => ({ userId: group$ . key , count: events . length })),
tap (() => console . log ( `Group ${ group$ . key } completed` ))
);
})
). subscribe ( result => {
console . log ( 'Group result:' , result );
});
Marble Diagram
Source: --1a--2a--1b--3a--2b--1c--|
Key: 1 2 1 3 2 1
Group 1: --a-----b--------c-----|
Group 2: ------a--------b-------|
Group 3: ------------a-----------|
Result: --G1--G2------G3--------|
(GroupedObservables)
Common Use Cases
User Session Tracking : Group events by user ID
Log Analysis : Group log entries by severity, module, or timestamp
Data Aggregation : Categorize and aggregate streaming data
Event Categorization : Organize events by type or source
Real-time Analytics : Group metrics by dimension
Stream Partitioning : Split a stream into multiple sub-streams
Each GroupedObservable must be subscribed to, otherwise grouped values will be buffered indefinitely, leading to memory leaks. Always use operators like mergeMap or mergeAll to subscribe to groups.
Advanced Example: Real-time User Activity Dashboard
import { interval , groupBy , mergeMap , map , bufferTime , scan } from 'rxjs' ;
interface UserAction {
userId : string ;
action : 'click' | 'scroll' | 'navigate' ;
timestamp : number ;
metadata ?: any ;
}
const userActions = new Subject < UserAction >();
// Group by user and aggregate activity
const userActivityDashboard = userActions . pipe (
groupBy ( action => action . userId ),
mergeMap ( userGroup$ => {
const userId = userGroup$ . key ;
return userGroup$ . pipe (
bufferTime ( 5000 ), // Aggregate every 5 seconds
filter ( actions => actions . length > 0 ),
map ( actions => ({
userId ,
period: new Date (). toISOString (),
clickCount: actions . filter ( a => a . action === 'click' ). length ,
scrollCount: actions . filter ( a => a . action === 'scroll' ). length ,
navigateCount: actions . filter ( a => a . action === 'navigate' ). length ,
totalActions: actions . length
})),
scan (( acc , curr ) => ({
... curr ,
lifetimeTotal: ( acc . lifetimeTotal || 0 ) + curr . totalActions
}), {} as any )
);
})
);
userActivityDashboard . subscribe ( stats => {
console . log ( 'User activity update:' , stats );
updateDashboard ( stats );
});
// Simulate events
userActions . next ({
userId: 'user1' ,
action: 'click' ,
timestamp: Date . now ()
});
Stream Processing by Category
import { interval , groupBy , mergeMap , map } from 'rxjs' ;
interface Message {
priority : 'high' | 'medium' | 'low' ;
content : string ;
id : number ;
}
const messages = interval ( 100 ). pipe (
take ( 50 ),
map ( i => ({
id: i ,
priority: [ 'high' , 'medium' , 'low' ][ i % 3 ] as any ,
content: `Message ${ i } `
}))
);
messages . pipe (
groupBy ( msg => msg . priority ),
mergeMap ( priorityGroup$ => {
const priority = priorityGroup$ . key ;
// Different processing based on priority
if ( priority === 'high' ) {
// High priority: process immediately
return priorityGroup$ . pipe (
map ( msg => ({ ... msg , processedBy: 'immediate' }))
);
} else if ( priority === 'medium' ) {
// Medium priority: batch every 500ms
return priorityGroup$ . pipe (
bufferTime ( 500 ),
filter ( batch => batch . length > 0 ),
map ( batch => ({
priority ,
processedBy: 'batched' ,
count: batch . length ,
messages: batch
}))
);
} else {
// Low priority: sample every 1s
return priorityGroup$ . pipe (
bufferTime ( 1000 ),
filter ( batch => batch . length > 0 ),
map ( batch => ({
priority ,
processedBy: 'sampled' ,
count: batch . length ,
latest: batch [ batch . length - 1 ]
}))
);
}
})
). subscribe ( result => {
console . log ( 'Processed:' , result );
});
Memory Management
Use the duration option to automatically complete and clean up groups after a certain time or condition. This prevents unbounded memory growth with long-lived streams.
import { groupBy , mergeMap , timer , takeUntil } from 'rxjs' ;
const stream = getDataStream ();
stream . pipe (
groupBy (
item => item . category ,
{
duration : group$ => timer ( 60000 ) // Groups auto-close after 1 minute
}
),
mergeMap ( group$ =>
group$ . pipe (
// Process group
toArray (),
map ( items => ({ category: group$ . key , items }))
)
)
). subscribe ( result => {
console . log ( 'Group completed:' , result );
});
Type Guards with groupBy
import { of , groupBy , mergeMap } from 'rxjs' ;
type Animal =
| { type : 'dog' ; breed : string ; bark : () => void }
| { type : 'cat' ; color : string ; meow : () => void };
const animals : Animal [] = [
{ type: 'dog' , breed: 'Labrador' , bark : () => console . log ( 'Woof!' ) },
{ type: 'cat' , color: 'orange' , meow : () => console . log ( 'Meow!' ) },
{ type: 'dog' , breed: 'Poodle' , bark : () => console . log ( 'Woof!' ) }
];
of ( ... animals ). pipe (
groupBy (
( animal ) : animal is { type: 'dog' ; breed : string ; bark : () => void } =>
animal . type === 'dog'
),
mergeMap ( group$ => {
if ( group$ . key ) {
// TypeScript knows these are dogs
return group$ . pipe (
map ( dog => `Dog breed: ${ dog . breed } ` )
);
} else {
// TypeScript knows these are cats
return group$ . pipe (
map ( cat => `Cat color: ${ cat . color } ` )
);
}
})
). subscribe ( console . log );
partition - Split stream into two based on a predicate
mergeMap - Often used with groupBy to flatten groups
reduce - Aggregate values within each group
bufferTime - Time-based windowing within groups
distinct - Remove duplicates (related concept)