Skip to main content
The @vk-io/streaming package provides access to VK’s Streaming API, allowing you to monitor public posts and comments in real-time based on custom rules and keywords.

Installation

npm install @vk-io/streaming
Node.js 12.20.0 or newer is required. This package requires vk-io as a peer dependency.

What is Streaming API?

VK’s Streaming API allows you to:
  • Monitor public posts and comments in real-time
  • Track keywords and phrases across VK
  • Filter content by custom rules
  • Receive instant notifications when matching content is published
The Streaming API only provides access to public content. Private posts and comments are not included.

Quick Start

Set up streaming to monitor posts containing specific keywords:
import { VK } from 'vk-io';
import { StreamingAPI } from '@vk-io/streaming';

const vk = new VK({ token: process.env.TOKEN });

const streaming = new StreamingAPI({
  api: vk.api,
  updates: vk.updates
});

// Listen for publications
vk.updates.on('publication', (context) => {
  console.log('New post:', context.text);
  console.log('Author:', context.author.id);
  console.log('Tags:', context.tags);
});

async function start() {
  // Start WebSocket connection
  await streaming.startWebSocket();
  
  // Add a rule to track keyword
  await streaming.addRule({
    tag: 'halloween',
    value: 'тыква' // Russian word for "pumpkin"
  });
  
  console.log('Streaming started!');
}

start().catch(console.error);

Configuration

Create a StreamingAPI instance:
import { StreamingAPI } from '@vk-io/streaming';

const streaming = new StreamingAPI({
  api: vk.api,      // VK API instance (required)
  updates: vk.updates, // VK Updates instance (required)
  agent: customAgent   // Custom HTTPS agent (optional)
});
api
API
required
VK-IO API instance for making API calls
updates
Updates
required
VK-IO Updates instance for dispatching events
agent
Agent
Custom HTTPS agent for WebSocket connections

Managing Rules

Rules define what content you want to monitor. Each rule consists of a search query and a tag.

Adding Rules

Add a single rule:
await streaming.addRule({
  tag: 'tech',
  value: 'iPhone OR Android'
});
Add multiple rules:
await streaming.addRules([
  { tag: 'games', value: 'PlayStation OR Xbox' },
  { tag: 'movies', value: 'Marvel OR DC' },
  { tag: 'sports', value: 'football OR basketball' }
]);

Rule Query Syntax

Rules support boolean operators and advanced search syntax:
// Match posts containing either word
await streaming.addRule({
  tag: 'programming',
  value: 'JavaScript OR TypeScript'
});

Getting Rules

Retrieve all active rules:
const rules = await streaming.getRules();

for (const rule of rules) {
  console.log(`Tag: ${rule.tag}`);
  console.log(`Query: ${rule.value}`);
}

Deleting Rules

Remove a specific rule by tag:
await streaming.deleteRule('tech');
Remove all rules:
await streaming.deleteRules();

Handling Events

Listen for streaming events using VK-IO’s update system:
vk.updates.on('publication', (context) => {
  console.log('Event type:', context.eventType); // post, comment, share
  console.log('Text:', context.text);
  console.log('Author ID:', context.author.id);
  console.log('Creation time:', context.creationTime);
  console.log('Matched tags:', context.tags);
  
  // Post URL
  if (context.eventUrl) {
    console.log('View at:', context.eventUrl);
  }
});

StreamingContext Properties

The context object contains:
eventType
string
Type of event: 'post', 'comment', or 'share'
eventUrl
string
Direct URL to the post/comment on VK
text
string
Text content of the post/comment
tags
string[]
Array of rule tags that matched this event
author
object
Author information:
  • id - Author’s VK ID
  • platform - Platform used (mobile, web, etc.)
creationTime
number
Unix timestamp when the content was created
attachments
object[]
Array of attachments (photos, links, etc.)
geo
object
Geolocation data if available

Connection Management

Starting Connection

// Start WebSocket connection
await streaming.startWebSocket();
console.log('Connected to streaming API');

Stopping Connection

// Stop WebSocket connection
await streaming.stop();
console.log('Disconnected from streaming API');

Automatic Reconnection

The streaming API automatically reconnects on connection loss:
const streaming = new StreamingAPI({
  api: vk.api,
  updates: vk.updates
});

// The client will automatically reconnect if the connection drops
await streaming.startWebSocket();

Use Cases

Brand Monitoring

Track mentions of your brand:
await streaming.addRule({
  tag: 'brand-mentions',
  value: '"YourBrand" OR @yourcompany'
});

vk.updates.on('publication', async (context) => {
  if (context.tags.includes('brand-mentions')) {
    console.log('Brand mentioned!');
    console.log('Post:', context.text);
    console.log('URL:', context.eventUrl);
    
    // Notify your team
    await notifyTeam({
      text: context.text,
      url: context.eventUrl,
      author: context.author.id
    });
  }
});

Sentiment Analysis

Analyze sentiment of posts about a topic:
await streaming.addRule({
  tag: 'product-feedback',
  value: 'YourProduct AND (good OR bad OR love OR hate)'
});

vk.updates.on('publication', async (context) => {
  const sentiment = await analyzeSentiment(context.text);
  
  await database.insert({
    text: context.text,
    sentiment,
    timestamp: context.creationTime,
    url: context.eventUrl
  });
});

Event Tracking

Monitor events or campaigns:
await streaming.addRules([
  { tag: 'conference', value: '"TechConf2024" OR #TechConf' },
  { tag: 'hashtag', value: '#YourCampaign' }
]);

vk.updates.on('publication', (context) => {
  console.log(`New ${context.tags[0]} post:`);
  console.log(context.text);
  console.log(context.eventUrl);
});

Competitor Analysis

Track competitor activities:
await streaming.addRules([
  { tag: 'competitor-a', value: 'CompetitorA OR @competitora' },
  { tag: 'competitor-b', value: 'CompetitorB OR @competitorb' }
]);

const stats = { 'competitor-a': 0, 'competitor-b': 0 };

vk.updates.on('publication', (context) => {
  for (const tag of context.tags) {
    if (stats[tag] !== undefined) {
      stats[tag]++;
    }
  }
  
  console.log('Mention stats:', stats);
});

Crisis Detection

Detect negative events quickly:
await streaming.addRule({
  tag: 'crisis',
  value: 'YourCompany AND (problem OR issue OR bug OR broken OR crash)'
});

vk.updates.on('publication', async (context) => {
  if (context.tags.includes('crisis')) {
    // Alert team immediately
    await sendAlert({
      priority: 'high',
      text: context.text,
      url: context.eventUrl
    });
  }
});

Error Handling

Handle streaming errors gracefully:
import { StreamingRuleError } from '@vk-io/streaming';

try {
  await streaming.addRule({
    tag: 'test',
    value: 'invalid syntax ((('
  });
} catch (error) {
  if (error instanceof StreamingRuleError) {
    console.error('Rule error:', error.message);
    console.error('Error code:', error.code);
  }
}

Best Practices

1

Use Specific Rules

Create focused rules instead of overly broad queries
// Good: Specific
value: '"iPhone 15" AND review'

// Bad: Too broad
value: 'phone'
2

Tag Organization

Use descriptive tags to organize and filter events
await streaming.addRules([
  { tag: 'brand-positive', value: 'Brand AND (good OR great OR love)' },
  { tag: 'brand-negative', value: 'Brand AND (bad OR hate OR worst)' }
]);
3

Handle Reconnection

The client handles reconnection automatically, but monitor connection status
// Rules persist across reconnections
await streaming.startWebSocket();
4

Store Data

Save streaming data to a database for analysis
vk.updates.on('publication', async (context) => {
  await db.posts.insert({
    text: context.text,
    author_id: context.author.id,
    created_at: context.creationTime,
    tags: context.tags
  });
});
5

Monitor Rate

Track the rate of incoming events to detect spikes
let eventCount = 0;
setInterval(() => {
  console.log(`Events per minute: ${eventCount}`);
  eventCount = 0;
}, 60000);

vk.updates.on('publication', () => {
  eventCount++;
});

Complete Example

A full streaming application that monitors multiple topics:
import { VK } from 'vk-io';
import { StreamingAPI } from '@vk-io/streaming';

const vk = new VK({ token: process.env.TOKEN });
const streaming = new StreamingAPI({
  api: vk.api,
  updates: vk.updates
});

// Statistics
const stats = {
  total: 0,
  byTag: new Map<string, number>(),
  byType: new Map<string, number>()
};

// Handle streaming events
vk.updates.on('publication', (context) => {
  stats.total++;
  
  // Count by tag
  for (const tag of context.tags) {
    stats.byTag.set(tag, (stats.byTag.get(tag) || 0) + 1);
  }
  
  // Count by event type
  stats.byType.set(
    context.eventType,
    (stats.byType.get(context.eventType) || 0) + 1
  );
  
  console.log('\n--- New Event ---');
  console.log('Type:', context.eventType);
  console.log('Text:', context.text?.substring(0, 100));
  console.log('Author:', context.author.id);
  console.log('Tags:', context.tags);
  console.log('URL:', context.eventUrl);
});

// Print stats every minute
setInterval(() => {
  console.log('\n=== Streaming Statistics ===');
  console.log('Total events:', stats.total);
  console.log('By tag:');
  for (const [tag, count] of stats.byTag) {
    console.log(`  ${tag}: ${count}`);
  }
  console.log('By type:');
  for (const [type, count] of stats.byType) {
    console.log(`  ${type}: ${count}`);
  }
}, 60000);

async function start() {
  try {
    // Start WebSocket
    await streaming.startWebSocket();
    console.log('✅ Connected to Streaming API');
    
    // Clear existing rules
    await streaming.deleteRules();
    console.log('🧹 Cleared existing rules');
    
    // Add monitoring rules
    await streaming.addRules([
      {
        tag: 'tech',
        value: '(JavaScript OR TypeScript OR React) AND tutorial'
      },
      {
        tag: 'ai',
        value: '(ChatGPT OR GPT OR "искусственный интеллект") AND NOT spam'
      },
      {
        tag: 'gaming',
        value: '(PlayStation OR Xbox OR Nintendo) AND (new OR release)'
      }
    ]);
    
    const rules = await streaming.getRules();
    console.log(`📝 Added ${rules.length} rules:`);
    for (const rule of rules) {
      console.log(`  [${rule.tag}]: ${rule.value}`);
    }
    
    console.log('\n🎧 Listening for events...');
    
  } catch (error) {
    console.error('❌ Error:', error);
    process.exit(1);
  }
}

// Graceful shutdown
process.on('SIGINT', async () => {
  console.log('\n\n👋 Shutting down...');
  await streaming.stop();
  process.exit(0);
});

start();

Limitations

  • The Streaming API only provides access to public content
  • There may be rate limits on the number of rules you can create
  • Events may be delayed by a few seconds
  • Not all posts may be delivered (eventual consistency)

Build docs developers (and LLMs) love