Aiven for OpenSearch is a fully managed search and analytics engine based on open-source OpenSearch. Ideal for log management, application search, analytical aggregations, and real-time data analysis with powerful visualization through OpenSearch Dashboards.
Overview
OpenSearch is an open-source distributed search and analytics suite that includes a search engine, NoSQL document database, and visualization interface. Originally forked from Elasticsearch and Kibana in 2021, OpenSearch provides full-text search based on Apache Lucene with a RESTful API and JSON document support.
Why Choose Aiven for OpenSearch
Unified Search & Analytics Full-text search, log analysis, and real-time analytics in one platform
OpenSearch Dashboards Built-in visualization and exploration interface included with every service
Rich Plugin Ecosystem Includes SQL support, anomaly detection, alerting, and security plugins
Schemaless Storage Index various data structures without predefined schemas
Key Features
Powerful search capabilities powered by Apache Lucene:
Relevance scoring and ranking
Fuzzy matching and autocomplete
Multi-field and compound queries
Highlighting and snippets
Custom analyzers and tokenizers
Example Search Query: {
"query" : {
"multi_match" : {
"query" : "kubernetes error" ,
"fields" : [ "message" , "tags" ],
"fuzziness" : "AUTO"
}
}
}
Comprehensive visualization and exploration:
Interactive dashboard builder
Time-series visualizations
Geo maps and spatial data
Custom visualizations
Saved searches and filters
Alerting and notifications
Access dashboards at the URL provided in your service overview.
Log Aggregation and Analysis
Purpose-built for log management:
Ingest logs from multiple sources
Real-time log parsing and enrichment
Time-based index management
Log retention policies
Integration with Aiven services for log collection
Enable log integration from other Aiven services: avn service integration-create \
--integration-type logs \
--source-service my-kafka-service \
--dest-service my-opensearch
ML-powered anomaly detection:
Automatic pattern recognition
Real-time anomaly alerts
Historical anomaly analysis
Customizable detectors
Proactive monitoring and notifications:
Custom alert conditions
Multiple notification channels
Alert history and audit trail
Scheduled and real-time alerts
Getting Started
Create OpenSearch Service
Deploy an OpenSearch service with Dashboards included: avn service create my-opensearch \
--service-type opensearch \
--cloud aws-us-east-1 \
--plan business-4
OpenSearch Dashboards is automatically included with every service.
Access OpenSearch Dashboards
Get the Dashboards URL and credentials from the service overview: avn service get my-opensearch --format '{service_uri_params.dashboards_uri}'
Login with the avnadmin user and password from the service credentials.
Create Your First Index
Indexes are like tables in a relational database: curl -X PUT " $OPENSEARCH_URI /my-index" \
-H "Content-Type: application/json" \
-d '{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
}
}'
Index Documents
Add documents to your index: curl -X POST " $OPENSEARCH_URI /my-index/_doc" \
-H "Content-Type: application/json" \
-d '{
"title": "First document",
"content": "This is the content",
"timestamp": "2024-03-04T10:00:00Z"
}'
Connection Examples
# Set your service URI
export OPENSEARCH_URI = "https://avnadmin:[email protected] :12345"
# Check cluster health
curl " $OPENSEARCH_URI /_cluster/health?pretty"
# Create an index with mappings
curl -X PUT " $OPENSEARCH_URI /products" \
-H "Content-Type: application/json" \
-d '{
"mappings": {
"properties": {
"name": { "type": "text" },
"price": { "type": "float" },
"category": { "type": "keyword" },
"created_at": { "type": "date" }
}
}
}'
# Index a document
curl -X POST " $OPENSEARCH_URI /products/_doc" \
-H "Content-Type: application/json" \
-d '{
"name": "Widget",
"price": 19.99,
"category": "electronics",
"created_at": "2024-03-04T10:00:00Z"
}'
# Search documents
curl -X GET " $OPENSEARCH_URI /products/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"match": {
"name": "widget"
}
}
}'
# Aggregation query
curl -X GET " $OPENSEARCH_URI /products/_search" \
-H "Content-Type: application/json" \
-d '{
"size": 0,
"aggs": {
"categories": {
"terms": { "field": "category" }
},
"avg_price": {
"avg": { "field": "price" }
}
}
}'
from opensearchpy import OpenSearch
from datetime import datetime
# Create client
client = OpenSearch(
hosts = [ 'https://opensearch-service.aivencloud.com:12345' ],
http_auth = ( 'avnadmin' , 'your-password' ),
use_ssl = True ,
verify_certs = True ,
ssl_show_warn = False
)
# Create index
index_body = {
'settings' : {
'index' : {
'number_of_shards' : 2 ,
'number_of_replicas' : 1
}
},
'mappings' : {
'properties' : {
'title' : { 'type' : 'text' },
'content' : { 'type' : 'text' },
'timestamp' : { 'type' : 'date' }
}
}
}
client.indices.create( index = 'logs' , body = index_body)
# Index a document
document = {
'title' : 'Application Error' ,
'content' : 'Connection timeout occurred' ,
'timestamp' : datetime.now(),
'level' : 'error' ,
'service' : 'api-gateway'
}
response = client.index(
index = 'logs' ,
body = document,
refresh = True
)
print ( f "Document indexed with ID: { response[ '_id' ] } " )
# Search documents
search_body = {
'query' : {
'bool' : {
'must' : [
{ 'match' : { 'content' : 'timeout' }},
{ 'term' : { 'level' : 'error' }}
],
'filter' : [
{ 'range' : { 'timestamp' : { 'gte' : 'now-1h' }}}
]
}
},
'sort' : [{ 'timestamp' : { 'order' : 'desc' }}],
'size' : 10
}
results = client.search( index = 'logs' , body = search_body)
for hit in results[ 'hits' ][ 'hits' ]:
print ( f " { hit[ '_source' ][ 'timestamp' ] } : { hit[ '_source' ][ 'title' ] } " )
# Aggregation
agg_body = {
'size' : 0 ,
'aggs' : {
'error_by_service' : {
'terms' : { 'field' : 'service.keyword' },
'aggs' : {
'recent_errors' : {
'top_hits' : {
'size' : 1 ,
'sort' : [{ 'timestamp' : { 'order' : 'desc' }}]
}
}
}
}
}
}
agg_results = client.search( index = 'logs' , body = agg_body)
const { Client } = require ( '@opensearch-project/opensearch' );
const client = new Client ({
node: 'https://opensearch-service.aivencloud.com:12345' ,
auth: {
username: 'avnadmin' ,
password: 'your-password'
},
ssl: {
rejectUnauthorized: true
}
});
async function main () {
// Create index
await client . indices . create ({
index: 'articles' ,
body: {
mappings: {
properties: {
title: { type: 'text' },
content: { type: 'text' },
author: { type: 'keyword' },
published_at: { type: 'date' }
}
}
}
});
// Index documents
await client . index ({
index: 'articles' ,
body: {
title: 'Getting Started with OpenSearch' ,
content: 'OpenSearch is a powerful search engine...' ,
author: 'John Doe' ,
published_at: new Date ()
},
refresh: true
});
// Search
const { body } = await client . search ({
index: 'articles' ,
body: {
query: {
multi_match: {
query: 'OpenSearch getting started' ,
fields: [ 'title^2' , 'content' ]
}
},
highlight: {
fields: {
title: {},
content: {}
}
}
}
});
body . hits . hits . forEach ( hit => {
console . log ( hit . _source . title );
if ( hit . highlight ) {
console . log ( 'Highlights:' , hit . highlight );
}
});
}
main (). catch ( console . error );
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core. * ;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
public class OpenSearchExample {
public static void main ( String [] args ) throws Exception {
// Create client
OpenSearchClient client = new OpenSearchClient (
ApacheHttpClient5TransportBuilder
. builder ( HttpHost . create ( "https://opensearch-service.aivencloud.com:12345" ))
. setMapper ( new JacksonJsonpMapper ())
. build ()
);
// Index a document
IndexRequest < Product > indexRequest = IndexRequest . of (i -> i
. index ( "products" )
. document ( new Product ( "Widget" , 19.99 , "electronics" ))
);
IndexResponse response = client . index (indexRequest);
System . out . println ( "Indexed document with ID: " + response . id ());
// Search
SearchResponse < Product > searchResponse = client . search (s -> s
. index ( "products" )
. query (q -> q
. match (m -> m
. field ( "name" )
. query ( "widget" )
)
),
Product . class
);
searchResponse . hits (). hits (). forEach (hit -> {
System . out . println ( "Found: " + hit . source (). getName ());
});
}
}
Advanced Features
Index Management
Index Lifecycle Management
Automate index rollover and deletion: PUT _plugins/_ism/policies/logs_policy
{
"policy" : {
"description" : "Log retention policy" ,
"default_state" : "hot" ,
"states" : [
{
"name" : "hot" ,
"actions" : [],
"transitions" : [
{
"state_name" : "warm" ,
"conditions" : {
"min_index_age" : "7d"
}
}
]
},
{
"name" : "warm" ,
"actions" : [
{ "replica_count" : { "number_of_replicas" : 0 }}
],
"transitions" : [
{
"state_name" : "delete" ,
"conditions" : {
"min_index_age" : "30d"
}
}
]
},
{
"name" : "delete" ,
"actions" : [
{ "delete" : {}}
]
}
]
}
}
Define settings for new indices: PUT _index_template/logs_template
{
"index_patterns" : [ "logs-*" ],
"template" : {
"settings" : {
"number_of_shards" : 2 ,
"number_of_replicas" : 1 ,
"index.codec" : "best_compression"
},
"mappings" : {
"properties" : {
"@timestamp" : { "type" : "date" },
"message" : { "type" : "text" },
"level" : { "type" : "keyword" },
"service" : { "type" : "keyword" }
}
}
}
}
Copy or transform data between indices: POST _reindex
{
"source" : {
"index" : "old_index"
},
"dest" : {
"index" : "new_index"
},
"script" : {
"source" : "ctx._source.new_field = ctx._source.old_field * 2" ,
"lang" : "painless"
}
}
Search Features
{
"query" : {
"bool" : {
"must" : [
{ "match" : { "title" : "search" }}
],
"should" : [
{ "match" : { "content" : "opensearch" }},
{ "match" : { "content" : "elasticsearch" }}
],
"must_not" : [
{ "term" : { "status" : "archived" }}
],
"filter" : [
{ "range" : { "created_at" : { "gte" : "2024-01-01" }}},
{ "terms" : { "category" : [ "tech" , "database" ]}}
],
"minimum_should_match" : 1
}
}
}
{
"size" : 0 ,
"aggs" : {
"date_histogram" : {
"date_histogram" : {
"field" : "@timestamp" ,
"interval" : "1h"
},
"aggs" : {
"levels" : {
"terms" : { "field" : "level" },
"aggs" : {
"avg_response_time" : {
"avg" : { "field" : "response_time" }
}
}
}
}
},
"percentiles" : {
"percentiles" : {
"field" : "response_time" ,
"percents" : [ 50 , 95 , 99 ]
}
}
}
}
Query using SQL syntax: POST _plugins / _sql
{
"query" : "SELECT service, COUNT(*) as error_count FROM logs WHERE level = 'error' GROUP BY service ORDER BY error_count DESC"
}
Keep shards between 10-50 GB each
Number of shards = data size / target shard size
More shards = more parallelism but more overhead
Use fewer, larger shards for better performance
Example: PUT /my-index
{
"settings" : {
"number_of_shards" : 3 ,
"number_of_replicas" : 1
}
}
Use bulk API for indexing multiple documents: curl -X POST " $OPENSEARCH_URI /_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @bulk-data.ndjson
{ "index" :{ "_index" : "logs" }}
{ "message" : "Log entry 1" , "level" : "info" }
{ "index" :{ "_index" : "logs" }}
{ "message" : "Log entry 2" , "level" : "warn" }
Use filters instead of queries when possible (cached)
Limit result size with size and from
Use _source filtering to reduce response size
Implement pagination with search_after
Use track_total_hits: false for large datasets
Use Cases
Log Management
Application Search
Security Analytics
Metrics and Monitoring
Centralized logging and analysis:
Collect logs from multiple services
Real-time log search and filtering
Create alerts for error patterns
Visualize log trends in dashboards
Enable log integration: avn service integration-create \
--integration-type logs \
--source-service my-app-service \
--dest-service my-opensearch
Full-text search for applications:
Product catalogs
Document repositories
Knowledge bases
User-generated content
Features:
Typo tolerance with fuzzy matching
Autocomplete and suggestions
Faceted search and filtering
Relevance tuning
Security information and event management:
Security log aggregation
Threat detection
Anomaly detection
Compliance reporting
Infrastructure monitoring:
Application performance monitoring
System metrics collection
Custom dashboards
Alerting on thresholds
Monitoring and Maintenance
Cluster Health
# Check cluster status
curl " $OPENSEARCH_URI /_cluster/health?pretty"
# Node statistics
curl " $OPENSEARCH_URI /_nodes/stats?pretty"
# Index statistics
curl " $OPENSEARCH_URI /_cat/indices?v"
Key Metrics
Cluster Health
Green: All shards allocated
Yellow: Replicas not allocated
Red: Primary shards not allocated
Performance
Query latency
Indexing rate
Search rate
Cache hit ratio
Resources
CPU usage
Memory (heap/non-heap)
Disk space
JVM garbage collection
Indices
Number of documents
Index size
Shard distribution
Unassigned shards
Security
Built-in user management
SAML authentication support
API key authentication
Certificate-based authentication
Role-based access control (RBAC): PUT _plugins/_security/api/roles/readonly_role
{
"cluster_permissions" : [ "cluster:monitor/*" ],
"index_permissions" : [
{
"index_patterns" : [ "logs-*" ],
"allowed_actions" : [ "read" ]
}
]
}
VPC peering for private connectivity
IP allowlisting
TLS encryption (required)
AWS PrivateLink support
Apache Kafka Stream logs from Kafka to OpenSearch
Grafana Visualize OpenSearch data in Grafana
Apache Flink Process streams and sink to OpenSearch
PostgreSQL Full-text search on PostgreSQL data
Resources
Trademark Notice : OpenSearch and OpenSearch Dashboards are open-source projects forked from formerly open-source Elasticsearch and Kibana projects.