func (p *CassandraPlugin) Store(cir *models.CIR) (*models.StorageResult, error) {
if !p.initialized {
return nil, fmt.Errorf("plugin not initialized")
}
// Validate CIR
if err := cir.Validate(); err != nil {
return nil, fmt.Errorf("invalid CIR: %w", err)
}
// Determine entity type
entityType := "default"
if et, ok := cir.GetParameter("entity_type"); ok {
if etStr, ok := et.(string); ok {
entityType = etStr
}
}
// Handle array data
affectedItems := 0
if arr, err := cir.GetDataAsArray(); err == nil {
for _, item := range arr {
if err := p.storeItem(entityType, item); err != nil {
return nil, err
}
affectedItems++
}
} else {
// Single item
dataMap, err := cir.GetDataAsMap()
if err != nil {
return nil, err
}
if err := p.storeItem(entityType, dataMap); err != nil {
return nil, err
}
affectedItems = 1
}
return &models.StorageResult{
Success: true,
AffectedItems: affectedItems,
}, nil
}
func (p *CassandraPlugin) storeItem(table string, data map[string]interface{}) error {
// Build INSERT statement
columns := []string{}
placeholders := []string{}
values := []interface{}{}
for key, value := range data {
columns = append(columns, key)
placeholders = append(placeholders, "?")
values = append(values, value)
}
query := fmt.Sprintf(
"INSERT INTO %s.%s (%s) VALUES (%s)",
p.keyspace,
table,
strings.Join(columns, ", "),
strings.Join(placeholders, ", "),
)
return p.session.Query(query, values...).Exec()
}