Skip to main content
The Elasticsearch sink indexes observability events in Elasticsearch or OpenSearch clusters. It supports both bulk indexing and data streams, with automatic API version detection and flexible authentication options.

Configuration

[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["my_source"]
endpoints = ["http://localhost:9200"]

# Authentication
auth.strategy = "basic"
auth.user = "elastic"
auth.password = "${ELASTICSEARCH_PASSWORD}"

# Index configuration
mode = "bulk"
bulk.index = "vector-%Y.%m.%d"
bulk.action = "index"

# Batching
batch.max_events = 1000
batch.timeout_secs = 5

# Compression
compression = "gzip"

Core Parameters

endpoints
array
required
A list of Elasticsearch endpoints to send logs to. The endpoint must contain an HTTP scheme, and may specify a hostname or IP address and port.Credentials can be embedded in the URL (e.g., https://user:[email protected]), but this cannot be combined with the auth configuration.
mode
string
default:"bulk"
Elasticsearch indexing mode. Options:
  • bulk: Standard bulk indexing using the Bulk API
  • data_stream: Use Elasticsearch Data Streams (requires Elasticsearch 7.9+)
mode = "bulk"
# or
mode = "data_stream"
api_version
string
default:"auto"
The API version of Elasticsearch. Set to auto for automatic detection, or explicitly specify v6, v7, or v8.Amazon OpenSearch Serverless requires auto.
api_version = "auto"
# or
api_version = "v8"

Authentication

The Elasticsearch sink supports multiple authentication strategies.

Basic Authentication

auth.strategy
string
default:"basic"
Use HTTP Basic Authentication with username and password.
auth.user
string
Basic authentication username.
auth.password
string
Basic authentication password (supports environment variables).
[sinks.elasticsearch.auth]
strategy = "basic"
user = "${ELASTICSEARCH_USERNAME}"
password = "${ELASTICSEARCH_PASSWORD}"

AWS Authentication (OpenSearch)

For AWS OpenSearch Service, use AWS IAM authentication:
auth.strategy
string
default:"aws"
Use AWS SigV4 authentication for Amazon OpenSearch Service.
[sinks.elasticsearch]
endpoints = ["https://my-domain.us-east-1.es.amazonaws.com"]

[sinks.elasticsearch.auth]
strategy = "aws"

[sinks.elasticsearch.aws]
region = "us-east-1"
For OpenSearch Serverless:
[sinks.elasticsearch]
endpoints = ["https://my-collection.us-east-1.aoss.amazonaws.com"]
opensearch_service_type = "serverless"
api_version = "auto"

[sinks.elasticsearch.auth]
strategy = "aws"

Bulk Mode Configuration

When using mode = "bulk", configure the bulk indexing behavior:
bulk.index
string
default:"vector-%Y.%m.%d"
The name of the index to write events to. Supports template syntax and date formatting using strftime specifiers.
bulk.index = "logs-%Y.%m.%d"
bulk.index = "application-{{ application_id }}-%Y-%m-%d"
bulk.action
string
default:"index"
Bulk API action. Options: index, create, update.
bulk.action = "index"  # Default
bulk.action = "create" # Fails if document exists
bulk.template_fallback_index
string
Default index name if the template in bulk.index cannot be resolved.
bulk.template_fallback_index = "vector-fallback"
id_key
string
Event field name to use for Elasticsearch’s _id field. If unspecified, Elasticsearch auto-generates IDs.
id_key = "id"
id_key = "_id"

Data Stream Mode

Data streams provide a convenient way to index time-series data in Elasticsearch:
data_stream.type
string
default:"logs"
Data stream type (first component of data stream name).
data_stream.type = "logs"
data_stream.type = "metrics"
data_stream.type = "{{ type }}"
data_stream.dataset
string
default:"generic"
Data stream dataset (second component of data stream name).
data_stream.dataset = "nginx"
data_stream.dataset = "{{ service }}"
data_stream.namespace
string
default:"default"
Data stream namespace (third component of data stream name).
data_stream.namespace = "production"
data_stream.namespace = "{{ environment }}"
data_stream.auto_routing
boolean
default:"true"
Automatically route events by deriving data stream name from event fields data_stream.type, data_stream.dataset, and data_stream.namespace.
data_stream.sync_fields
boolean
default:"true"
Automatically add and sync data_stream.* event fields to match the data stream name.
[sinks.elasticsearch]
mode = "data_stream"

[sinks.elasticsearch.data_stream]
type = "logs"
dataset = "nginx"
namespace = "production"
auto_routing = true
sync_fields = true
This creates data streams with names like logs-nginx-production.

Batching

Configure batching to optimize throughput:
batch.max_events
integer
default:"1000"
Maximum number of events to batch before flushing.
batch.max_bytes
integer
default:"10485760"
Maximum size of a batch in bytes.
batch.timeout_secs
float
default:"1"
Maximum time to wait before flushing a partial batch.
[sinks.elasticsearch.batch]
max_events = 500
max_bytes = 5242880  # 5MB
timeout_secs = 5.0

Encoding

encoding
object
Configure how events are encoded before sending to Elasticsearch. Supports field transformations and filtering.
[sinks.elasticsearch.encoding]
except_fields = ["_metadata", "secret_field"]
timestamp_format = "rfc3339"

Compression

compression
string
default:"gzip"
Compression algorithm. Options: none, gzip, zstd, snappy.
compression = "gzip"

Advanced Options

pipeline
string
Name of the Elasticsearch ingest pipeline to apply.
pipeline = "my-pipeline"
query
object
Custom query parameters to add to each HTTP request.
[sinks.elasticsearch.query]
refresh = "true"
request_retry_partial
boolean
default:"false"
Whether to retry successful requests containing partial failures. Use with id_key to avoid duplicates.
request_retry_partial = true
id_key = "event_id"
doc_type
string
default:"_doc"
Document type for Elasticsearch 6.x and below. Ignored in Elasticsearch 7.x+.

TLS Configuration

tls.enabled
boolean
default:"true"
Enable TLS/SSL connections.
tls.ca_file
string
Path to CA certificate file for verifying the server.
[sinks.elasticsearch.tls]
ca_file = "/path/to/ca.pem"
tls.crt_file
string
Path to client certificate file for mutual TLS.
tls.key_file
string
Path to client private key file for mutual TLS.
tls.verify_certificate
boolean
default:"true"
Verify the server’s TLS certificate.
tls.verify_hostname
boolean
default:"true"
Verify the server’s hostname matches the certificate.

Request Configuration

request.timeout_secs
integer
default:"60"
Request timeout in seconds.
request.rate_limit_num
integer
Maximum number of requests per time window.
request.retry_attempts
integer
default:"5"
Number of retry attempts for failed requests.
[sinks.elasticsearch.request]
timeout_secs = 30
rate_limit_num = 100
retry_attempts = 3

Complete Examples

Basic Configuration

[sinks.elasticsearch_basic]
type = "elasticsearch"
inputs = ["my_source"]
endpoints = ["http://localhost:9200"]

auth.strategy = "basic"
auth.user = "elastic"
auth.password = "changeme"

bulk.index = "logs-%Y.%m.%d"
compression = "gzip"

Data Streams with AWS OpenSearch

[sinks.opensearch]
type = "elasticsearch"
inputs = ["logs"]
endpoints = ["https://my-domain.us-east-1.es.amazonaws.com"]

mode = "data_stream"

[sinks.opensearch.auth]
strategy = "aws"

[sinks.opensearch.aws]
region = "us-east-1"

[sinks.opensearch.data_stream]
type = "logs"
dataset = "application"
namespace = "production"

High-Throughput Configuration

[sinks.elasticsearch_fast]
type = "elasticsearch"
inputs = ["metrics"]
endpoints = [
  "http://es-node1:9200",
  "http://es-node2:9200",
  "http://es-node3:9200"
]

auth.strategy = "basic"
auth.user = "vector"
auth.password = "${ES_PASSWORD}"

bulk.index = "metrics-%Y.%m.%d"
bulk.action = "create"

[sinks.elasticsearch_fast.batch]
max_events = 2000
max_bytes = 10485760
timeout_secs = 2

[sinks.elasticsearch_fast.request.tower]
concurrency = 10
rate_limit_num = 1000

compression = "zstd"

Troubleshooting

Connection Issues

If you can’t connect to Elasticsearch:
  1. Verify endpoints are correct and accessible
  2. Check authentication credentials
  3. Ensure TLS settings match your cluster configuration
  4. Review firewall and network policies

Indexing Errors

For document indexing failures:
  1. Check index template exists and matches your data
  2. Verify field mappings in Elasticsearch
  3. Enable request_retry_partial with id_key for partial failures
  4. Review Elasticsearch cluster logs

Performance Optimization

  1. Increase batch size: Higher batch.max_events reduces overhead
  2. Use compression: Enable compression = "gzip" or compression = "zstd"
  3. Multiple endpoints: Load balance across multiple nodes
  4. Adjust concurrency: Tune request.tower.concurrency
  5. Data streams: Use data streams for time-series data

Best Practices

  1. Use data streams for time-series data (logs, metrics)
  2. Set id_key to prevent duplicates when retrying
  3. Enable compression to reduce network usage
  4. Configure batching to balance latency and throughput
  5. Use AWS authentication for OpenSearch Service
  6. Monitor cluster health to prevent backpressure
  7. Index lifecycle management to manage data retention

See Also

Build docs developers (and LLMs) love