The @vk-io/streaming package provides access to VK’s Streaming API, allowing you to monitor public posts and comments in real-time based on custom rules and keywords.
Installation
npm install @vk-io/streaming
Node.js 12.20.0 or newer is required. This package requires vk-io as a peer dependency.
What is Streaming API?
VK’s Streaming API allows you to:
Monitor public posts and comments in real-time
Track keywords and phrases across VK
Filter content by custom rules
Receive instant notifications when matching content is published
The Streaming API only provides access to public content. Private posts and comments are not included.
Quick Start
Set up streaming to monitor posts containing specific keywords:
import { VK } from 'vk-io' ;
import { StreamingAPI } from '@vk-io/streaming' ;
const vk = new VK ({ token: process . env . TOKEN });
const streaming = new StreamingAPI ({
api: vk . api ,
updates: vk . updates
});
// Listen for publications
vk . updates . on ( 'publication' , ( context ) => {
console . log ( 'New post:' , context . text );
console . log ( 'Author:' , context . author . id );
console . log ( 'Tags:' , context . tags );
});
async function start () {
// Start WebSocket connection
await streaming . startWebSocket ();
// Add a rule to track keyword
await streaming . addRule ({
tag: 'halloween' ,
value: 'тыква' // Russian word for "pumpkin"
});
console . log ( 'Streaming started!' );
}
start (). catch ( console . error );
Configuration
Create a StreamingAPI instance:
import { StreamingAPI } from '@vk-io/streaming' ;
const streaming = new StreamingAPI ({
api: vk . api , // VK API instance (required)
updates: vk . updates , // VK Updates instance (required)
agent: customAgent // Custom HTTPS agent (optional)
});
VK-IO API instance for making API calls
VK-IO Updates instance for dispatching events
Custom HTTPS agent for WebSocket connections
Managing Rules
Rules define what content you want to monitor. Each rule consists of a search query and a tag.
Adding Rules
Add a single rule:
await streaming . addRule ({
tag: 'tech' ,
value: 'iPhone OR Android'
});
Add multiple rules:
await streaming . addRules ([
{ tag: 'games' , value: 'PlayStation OR Xbox' },
{ tag: 'movies' , value: 'Marvel OR DC' },
{ tag: 'sports' , value: 'football OR basketball' }
]);
Rule Query Syntax
Rules support boolean operators and advanced search syntax:
OR Operator
AND Operator
NOT Operator
Phrases
Complex Queries
// Match posts containing either word
await streaming . addRule ({
tag: 'programming' ,
value: 'JavaScript OR TypeScript'
});
Getting Rules
Retrieve all active rules:
const rules = await streaming . getRules ();
for ( const rule of rules ) {
console . log ( `Tag: ${ rule . tag } ` );
console . log ( `Query: ${ rule . value } ` );
}
Deleting Rules
Remove a specific rule by tag:
await streaming . deleteRule ( 'tech' );
Remove all rules:
await streaming . deleteRules ();
Handling Events
Listen for streaming events using VK-IO’s update system:
vk . updates . on ( 'publication' , ( context ) => {
console . log ( 'Event type:' , context . eventType ); // post, comment, share
console . log ( 'Text:' , context . text );
console . log ( 'Author ID:' , context . author . id );
console . log ( 'Creation time:' , context . creationTime );
console . log ( 'Matched tags:' , context . tags );
// Post URL
if ( context . eventUrl ) {
console . log ( 'View at:' , context . eventUrl );
}
});
StreamingContext Properties
The context object contains:
Type of event: 'post', 'comment', or 'share'
Direct URL to the post/comment on VK
Text content of the post/comment
Array of rule tags that matched this event
Author information:
id - Author’s VK ID
platform - Platform used (mobile, web, etc.)
Unix timestamp when the content was created
Array of attachments (photos, links, etc.)
Geolocation data if available
Connection Management
Starting Connection
// Start WebSocket connection
await streaming . startWebSocket ();
console . log ( 'Connected to streaming API' );
Stopping Connection
// Stop WebSocket connection
await streaming . stop ();
console . log ( 'Disconnected from streaming API' );
Automatic Reconnection
The streaming API automatically reconnects on connection loss:
const streaming = new StreamingAPI ({
api: vk . api ,
updates: vk . updates
});
// The client will automatically reconnect if the connection drops
await streaming . startWebSocket ();
Use Cases
Brand Monitoring
Track mentions of your brand:
await streaming . addRule ({
tag: 'brand-mentions' ,
value: '"YourBrand" OR @yourcompany'
});
vk . updates . on ( 'publication' , async ( context ) => {
if ( context . tags . includes ( 'brand-mentions' )) {
console . log ( 'Brand mentioned!' );
console . log ( 'Post:' , context . text );
console . log ( 'URL:' , context . eventUrl );
// Notify your team
await notifyTeam ({
text: context . text ,
url: context . eventUrl ,
author: context . author . id
});
}
});
Sentiment Analysis
Analyze sentiment of posts about a topic:
await streaming . addRule ({
tag: 'product-feedback' ,
value: 'YourProduct AND (good OR bad OR love OR hate)'
});
vk . updates . on ( 'publication' , async ( context ) => {
const sentiment = await analyzeSentiment ( context . text );
await database . insert ({
text: context . text ,
sentiment ,
timestamp: context . creationTime ,
url: context . eventUrl
});
});
Event Tracking
Monitor events or campaigns:
await streaming . addRules ([
{ tag: 'conference' , value: '"TechConf2024" OR #TechConf' },
{ tag: 'hashtag' , value: '#YourCampaign' }
]);
vk . updates . on ( 'publication' , ( context ) => {
console . log ( `New ${ context . tags [ 0 ] } post:` );
console . log ( context . text );
console . log ( context . eventUrl );
});
Competitor Analysis
Track competitor activities:
await streaming . addRules ([
{ tag: 'competitor-a' , value: 'CompetitorA OR @competitora' },
{ tag: 'competitor-b' , value: 'CompetitorB OR @competitorb' }
]);
const stats = { 'competitor-a' : 0 , 'competitor-b' : 0 };
vk . updates . on ( 'publication' , ( context ) => {
for ( const tag of context . tags ) {
if ( stats [ tag ] !== undefined ) {
stats [ tag ] ++ ;
}
}
console . log ( 'Mention stats:' , stats );
});
Crisis Detection
Detect negative events quickly:
await streaming . addRule ({
tag: 'crisis' ,
value: 'YourCompany AND (problem OR issue OR bug OR broken OR crash)'
});
vk . updates . on ( 'publication' , async ( context ) => {
if ( context . tags . includes ( 'crisis' )) {
// Alert team immediately
await sendAlert ({
priority: 'high' ,
text: context . text ,
url: context . eventUrl
});
}
});
Error Handling
Handle streaming errors gracefully:
import { StreamingRuleError } from '@vk-io/streaming' ;
try {
await streaming . addRule ({
tag: 'test' ,
value: 'invalid syntax ((('
});
} catch ( error ) {
if ( error instanceof StreamingRuleError ) {
console . error ( 'Rule error:' , error . message );
console . error ( 'Error code:' , error . code );
}
}
Best Practices
Use Specific Rules
Create focused rules instead of overly broad queries // Good: Specific
value : '"iPhone 15" AND review'
// Bad: Too broad
value : 'phone'
Tag Organization
Use descriptive tags to organize and filter events await streaming . addRules ([
{ tag: 'brand-positive' , value: 'Brand AND (good OR great OR love)' },
{ tag: 'brand-negative' , value: 'Brand AND (bad OR hate OR worst)' }
]);
Handle Reconnection
The client handles reconnection automatically, but monitor connection status // Rules persist across reconnections
await streaming . startWebSocket ();
Store Data
Save streaming data to a database for analysis vk . updates . on ( 'publication' , async ( context ) => {
await db . posts . insert ({
text: context . text ,
author_id: context . author . id ,
created_at: context . creationTime ,
tags: context . tags
});
});
Monitor Rate
Track the rate of incoming events to detect spikes let eventCount = 0 ;
setInterval (() => {
console . log ( `Events per minute: ${ eventCount } ` );
eventCount = 0 ;
}, 60000 );
vk . updates . on ( 'publication' , () => {
eventCount ++ ;
});
Complete Example
A full streaming application that monitors multiple topics:
import { VK } from 'vk-io' ;
import { StreamingAPI } from '@vk-io/streaming' ;
const vk = new VK ({ token: process . env . TOKEN });
const streaming = new StreamingAPI ({
api: vk . api ,
updates: vk . updates
});
// Statistics
const stats = {
total: 0 ,
byTag: new Map < string , number >(),
byType: new Map < string , number >()
};
// Handle streaming events
vk . updates . on ( 'publication' , ( context ) => {
stats . total ++ ;
// Count by tag
for ( const tag of context . tags ) {
stats . byTag . set ( tag , ( stats . byTag . get ( tag ) || 0 ) + 1 );
}
// Count by event type
stats . byType . set (
context . eventType ,
( stats . byType . get ( context . eventType ) || 0 ) + 1
);
console . log ( ' \n --- New Event ---' );
console . log ( 'Type:' , context . eventType );
console . log ( 'Text:' , context . text ?. substring ( 0 , 100 ));
console . log ( 'Author:' , context . author . id );
console . log ( 'Tags:' , context . tags );
console . log ( 'URL:' , context . eventUrl );
});
// Print stats every minute
setInterval (() => {
console . log ( ' \n === Streaming Statistics ===' );
console . log ( 'Total events:' , stats . total );
console . log ( 'By tag:' );
for ( const [ tag , count ] of stats . byTag ) {
console . log ( ` ${ tag } : ${ count } ` );
}
console . log ( 'By type:' );
for ( const [ type , count ] of stats . byType ) {
console . log ( ` ${ type } : ${ count } ` );
}
}, 60000 );
async function start () {
try {
// Start WebSocket
await streaming . startWebSocket ();
console . log ( '✅ Connected to Streaming API' );
// Clear existing rules
await streaming . deleteRules ();
console . log ( '🧹 Cleared existing rules' );
// Add monitoring rules
await streaming . addRules ([
{
tag: 'tech' ,
value: '(JavaScript OR TypeScript OR React) AND tutorial'
},
{
tag: 'ai' ,
value: '(ChatGPT OR GPT OR "искусственный интеллект") AND NOT spam'
},
{
tag: 'gaming' ,
value: '(PlayStation OR Xbox OR Nintendo) AND (new OR release)'
}
]);
const rules = await streaming . getRules ();
console . log ( `📝 Added ${ rules . length } rules:` );
for ( const rule of rules ) {
console . log ( ` [ ${ rule . tag } ]: ${ rule . value } ` );
}
console . log ( ' \n 🎧 Listening for events...' );
} catch ( error ) {
console . error ( '❌ Error:' , error );
process . exit ( 1 );
}
}
// Graceful shutdown
process . on ( 'SIGINT' , async () => {
console . log ( ' \n\n 👋 Shutting down...' );
await streaming . stop ();
process . exit ( 0 );
});
start ();
Limitations
The Streaming API only provides access to public content
There may be rate limits on the number of rules you can create
Events may be delayed by a few seconds
Not all posts may be delivered (eventual consistency)