Skip to main content
Monitor the health and activity of data sources to ensure continuous data collection.

Status Monitoring

Data sources have two key status indicators:
  1. Status - Boolean indicating if the source is actively configured
  2. Timestamp - Last time data was received from the source

Health Indicators

Active Sources

A data source is considered healthy when:
  • status is true
  • timestamp is recent (typically within last 15 minutes)
  • eventsCount is incrementing

Inactive Sources

A data source may be inactive if:
  • status is false (manually disabled)
  • timestamp is old (no recent data)
  • eventsCount is zero or not incrementing

Monitoring Examples

Check All Source Status

cURL
curl -X GET https://your-utmstack-instance.com/api/utm-data-input-statuses?size=100 \
  -H "Authorization: Bearer eyJhbGciOiJIUzUxMiJ9..."

Python Monitoring Script

Python
import requests
from datetime import datetime, timedelta

def check_data_source_health(api_url, token):
    """
    Check health of all data sources and alert on issues.
    """
    headers = {"Authorization": f"Bearer {token}"}
    
    # Get all sources
    response = requests.get(
        f"{api_url}/utm-data-input-statuses",
        headers=headers,
        params={"size": 1000}
    )
    sources = response.json()
    
    # Check each source
    inactive_threshold = datetime.utcnow() - timedelta(minutes=15)
    issues = []
    
    for source in sources:
        if not source["status"]:
            issues.append({
                "source": source["dataTypeName"],
                "issue": "Source is disabled",
                "severity": "warning"
            })
            continue
        
        # Check timestamp
        last_seen = datetime.fromisoformat(
            source["timestamp"].replace("Z", "+00:00")
        )
        
        if last_seen < inactive_threshold:
            minutes_ago = int((datetime.utcnow() - last_seen.replace(tzinfo=None)).total_seconds() / 60)
            issues.append({
                "source": source["dataTypeName"],
                "issue": f"No data received for {minutes_ago} minutes",
                "severity": "critical" if minutes_ago > 60 else "warning",
                "last_seen": source["timestamp"]
            })
    
    return issues

# Usage
api_url = "https://your-utmstack-instance.com/api"
token = "YOUR_TOKEN"

issues = check_data_source_health(api_url, token)
for issue in issues:
    print(f"[{issue['severity'].upper()}] {issue['source']}: {issue['issue']}")

JavaScript Monitoring

JavaScript
async function checkDataSourceHealth(apiUrl, token) {
  const headers = { Authorization: `Bearer ${token}` };
  
  // Get all sources
  const response = await fetch(
    `${apiUrl}/utm-data-input-statuses?size=1000`,
    { headers }
  );
  const sources = await response.json();
  
  // Check each source
  const inactiveThreshold = new Date(Date.now() - 15 * 60 * 1000);
  const issues = [];
  
  for (const source of sources) {
    if (!source.status) {
      issues.push({
        source: source.dataTypeName,
        issue: 'Source is disabled',
        severity: 'warning'
      });
      continue;
    }
    
    const lastSeen = new Date(source.timestamp);
    if (lastSeen < inactiveThreshold) {
      const minutesAgo = Math.floor((Date.now() - lastSeen.getTime()) / 60000);
      issues.push({
        source: source.dataTypeName,
        issue: `No data received for ${minutesAgo} minutes`,
        severity: minutesAgo > 60 ? 'critical' : 'warning',
        lastSeen: source.timestamp
      });
    }
  }
  
  return issues;
}

// Usage
const apiUrl = 'https://your-utmstack-instance.com/api';
const token = 'YOUR_TOKEN';

const issues = await checkDataSourceHealth(apiUrl, token);
issues.forEach(issue => {
  console.log(`[${issue.severity.toUpperCase()}] ${issue.source}: ${issue.issue}`);
});

Automated Alerts

Send Notifications for Inactive Sources

Python
import requests
from datetime import datetime, timedelta

def alert_on_inactive_sources(api_url, token, webhook_url):
    """
    Check for inactive sources and send alerts via webhook.
    """
    headers = {"Authorization": f"Bearer {token}"}
    
    # Get all sources
    response = requests.get(
        f"{api_url}/utm-data-input-statuses",
        headers=headers,
        params={"size": 1000}
    )
    sources = response.json()
    
    # Find inactive sources
    threshold = datetime.utcnow() - timedelta(minutes=30)
    inactive = []
    
    for source in sources:
        if not source["status"]:
            continue
        
        last_seen = datetime.fromisoformat(
            source["timestamp"].replace("Z", "+00:00")
        )
        
        if last_seen < threshold:
            inactive.append({
                "name": source["dataTypeName"],
                "type": source["dataType"],
                "lastSeen": source["timestamp"]
            })
    
    # Send alert if inactive sources found
    if inactive:
        alert_message = {
            "title": "Data Sources Inactive",
            "message": f"{len(inactive)} data source(s) have not reported data in over 30 minutes",
            "sources": inactive,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        requests.post(webhook_url, json=alert_message)
        return inactive
    
    return []

Metrics Collection

Collect Source Metrics Over Time

Python
import requests
import time
from datetime import datetime

def collect_source_metrics(api_url, token, interval_seconds=60):
    """
    Continuously collect data source metrics.
    """
    headers = {"Authorization": f"Bearer {token}"}
    metrics_log = []
    
    while True:
        response = requests.get(
            f"{api_url}/utm-data-input-statuses",
            headers=headers,
            params={"size": 1000}
        )
        sources = response.json()
        
        # Record metrics
        snapshot = {
            "timestamp": datetime.utcnow().isoformat(),
            "total_sources": len(sources),
            "active_sources": sum(1 for s in sources if s["status"]),
            "total_events": sum(s.get("eventsCount", 0) for s in sources),
            "sources": [
                {
                    "name": s["dataTypeName"],
                    "events": s.get("eventsCount", 0),
                    "active": s["status"]
                }
                for s in sources
            ]
        }
        
        metrics_log.append(snapshot)
        print(f"Collected metrics: {snapshot['active_sources']}/{snapshot['total_sources']} sources active")
        
        # Save or process metrics
        # ... save to database, send to monitoring system, etc.
        
        time.sleep(interval_seconds)

# Run continuously
# collect_source_metrics("https://your-utmstack-instance.com/api", "YOUR_TOKEN")

Best Practices

  1. Regular Polling: Check source status every 5-15 minutes
  2. Threshold Alerts: Alert when sources are inactive for >30 minutes
  3. Trend Analysis: Track event counts over time to detect anomalies
  4. Dashboard Integration: Display source health on monitoring dashboards
  5. Automated Recovery: Attempt to restart or reconfigure failed sources

Build docs developers (and LLMs) love