Skip to main content
Bulk API v2 provides a simplified and more efficient way to process large data sets in Salesforce. It offers better performance, easier error handling, and a streamlined workflow compared to v1.

Overview

Bulk API v2 is recommended for:
  • High-volume data loads (insert, update, upsert, delete)
  • Large queries that would timeout with standard API
  • Modern applications requiring simpler integration
  • Operations where you need detailed success/failure reporting

Key Improvements over v1

  • Simplified workflow - No batch management required
  • Better performance - Optimized processing engine
  • Improved error handling - Separate endpoints for successful, failed, and unprocessed records
  • Single upload - All data uploaded in one operation
  • Better monitoring - Enhanced job status information

Quick Start

Simple Data Load

const accounts = [
  { Name: 'Acme Corp', Industry: 'Technology', NumberOfEmployees: 500 },
  { Name: 'Global Inc', Industry: 'Finance', NumberOfEmployees: 1000 },
  { Name: 'Innovate LLC', Industry: 'Healthcare', NumberOfEmployees: 250 }
];

const results = await conn.bulk2.loadAndWaitForResults({
  object: 'Account',
  operation: 'insert',
  input: accounts
});

console.log('Successful:', results.successfulResults.length);
console.log('Failed:', results.failedResults.length);

Bulk Query

const soql = 'SELECT Id, Name, Industry FROM Account WHERE CreatedDate = LAST_MONTH';
const recordStream = await conn.bulk2.query(soql);

recordStream.on('record', record => {
  console.log(record.Name, record.Industry);
});

Supported Operations

operation
IngestOperation
required
The type of ingest operation to perform:
  • insert - Create new records
  • update - Update existing records by Id
  • upsert - Insert or update based on external ID field
  • delete - Soft delete records
  • hardDelete - Permanently delete records from recycle bin
For queries, use the query() method with operation type determined by options:
  • query - Standard query (default)
  • queryAll - Query including deleted and archived records

BulkV2 Class

The main entry point for Bulk API v2 operations.

Properties

pollInterval
number
default:1000
Polling interval in milliseconds for checking job status.
pollTimeout
number
default:30000
Maximum time in milliseconds to wait for job completion.

loadAndWaitForResults()

Create, upload, and process a bulk ingest job in one operation.
const results = await conn.bulk2.loadAndWaitForResults({
  object: 'Contact',
  operation: 'upsert',
  externalIdFieldName: 'Email__c',
  input: contacts,
  pollInterval: 5000,
  pollTimeout: 120000
});

// Process results
for (const record of results.successfulResults) {
  console.log('Created/Updated:', record.sf__Id);
}

for (const record of results.failedResults) {
  console.log('Failed:', record.sf__Error);
}
Parameters:
object
string
required
The Salesforce object API name (e.g., ‘Account’, ‘Contact’)
operation
IngestOperation
required
The operation to perform
input
Record[] | Readable | string
required
Data to process (array of records, CSV string, or readable stream)
externalIdFieldName
string
External ID field for upsert operations
assignmentRuleId
string
Assignment rule ID for Case or Lead objects
columnDelimiter
string
default:"COMMA"
CSV column delimiter
lineEnding
string
default:"LF"
CSV line ending style
pollInterval
number
Override default polling interval
pollTimeout
number
Override default polling timeout
Returns: Promise<IngestJobV2Results<S>>
IngestJobV2Results
object
successfulResults
Array
Records that were successfully processed
sf__Id
string
Record ID
sf__Created
string
Whether record was created
...fields
Original record fields
failedResults
Array
Records that failed processing
sf__Id
string
Record ID (if available)
sf__Error
string
Error message
...fields
Original record fields
unprocessedRecords
Array | string
Records that were not processed (due to job abortion or other issues)

query()

Execute a bulk query and get results as a record stream.
const soql = 'SELECT Id, Name, CreatedDate FROM Account ORDER BY CreatedDate DESC';

const recordStream = await conn.bulk2.query(soql, {
  scanAll: false,  // Set true for queryAll
  pollInterval: 2000,
  pollTimeout: 60000
});

recordStream.on('record', record => {
  console.log(record);
});

// Or convert to CSV
recordStream.stream('csv').pipe(fs.createWriteStream('output.csv'));
Parameters:
soql
string
required
The SOQL query to execute
options
object
Query configuration options
scanAll
boolean
default:false
If true, uses queryAll to include deleted and archived records
columnDelimiter
QueryJobInfoV2['columnDelimiter']
CSV delimiter for results
lineEnding
QueryJobInfoV2['lineEnding']
Line ending style for results
pollInterval
number
Override default polling interval
pollTimeout
number
Override default polling timeout
Returns: Promise<Parsable<Record>> - A record stream

createJob()

Create an ingest job instance for manual control.
const job = conn.bulk2.createJob({
  object: 'Account',
  operation: 'insert'
});

await job.open();
await job.uploadData(records);
await job.close();
await job.poll();

const results = await job.getAllResults();
Parameters:
options
NewIngestJobOptions
required
Job configuration
object
string
required
Salesforce object API name
operation
IngestOperation
required
Operation type
externalIdFieldName
string
External ID field for upsert
assignmentRuleId
string
Assignment rule ID
columnDelimiter
string
CSV column delimiter
lineEnding
string
Line ending style
Returns: IngestJobV2<S> - An ingest job instance

job()

Get an existing job instance by ID.
const job = conn.bulk2.job('ingest', { id: '750...' });
const jobInfo = await job.check();
console.log(jobInfo.state);
Parameters:
type
string
required
Type of job to retrieve
options
object
required
id
string
required
The Salesforce job ID
Returns: IngestJobV2<S> or QueryJobV2<S>

IngestJobV2 Class

Represents a Bulk API v2 ingest job.

Properties

id
string
The job ID assigned by Salesforce

open()

Create the job in Salesforce.
const job = conn.bulk2.createJob({
  object: 'Account',
  operation: 'insert'
});

const jobInfo = await job.open();
console.log('Job ID:', jobInfo.id);
Returns: Promise<JobInfoV2> - Job information
JobInfoV2
object
id
string
Job ID
object
string
Object type
operation
IngestOperation
Operation type
state
string
Job state (Open, UploadComplete, InProgress, JobComplete, Aborted, Failed)
createdById
string
User who created the job
createdDate
string
Creation timestamp
numberRecordsProcessed
number
Records processed
numberRecordsFailed
number
Records failed
errorMessage
string
Error message if failed

uploadData()

Upload data to the job.
const records = [
  { Name: 'Company A', Industry: 'Tech' },
  { Name: 'Company B', Industry: 'Finance' }
];

await job.uploadData(records);
Parameters:
input
string | Record[] | Readable
required
Data to upload
Returns: Promise<void>
Data can only be uploaded once per job. Multiple uploads will throw an error.

close()

Mark the job as ready for processing.
await job.close();
Returns: Promise<void>
You must close the job before it will be processed. After closing, you cannot upload more data.

poll()

Poll for job completion.
job.on('inProgress', info => {
  console.log('Processed:', info.numberRecordsProcessed);
});

job.on('jobComplete', info => {
  console.log('Job finished!');
});

try {
  await job.poll(2000, 120000); // Check every 2s for up to 2 minutes
} catch (err) {
  if (err.name === 'JobPollingTimeout') {
    console.log('Job still processing, check back later');
  }
}
Parameters:
interval
number
Polling interval in milliseconds (defaults to instance pollInterval)
timeout
number
Polling timeout in milliseconds (defaults to instance pollTimeout)
Returns: Promise<void> - Resolves when job completes

check()

Check current job status.
const jobInfo = await job.check();
console.log('State:', jobInfo.state);
console.log('Processed:', jobInfo.numberRecordsProcessed);
console.log('Failed:', jobInfo.numberRecordsFailed);
Returns: Promise<JobInfoV2> - Current job information

getAllResults()

Retrieve all results (successful, failed, and unprocessed records).
const results = await job.getAllResults();

console.log('Success:', results.successfulResults.length);
console.log('Failed:', results.failedResults.length);
console.log('Unprocessed:', results.unprocessedRecords.length);

// Process failed records
for (const failed of results.failedResults) {
  console.log(`Record failed: ${failed.sf__Error}`);
  console.log('Original data:', failed);
}
Returns: Promise<IngestJobV2Results<S>>

getSuccessfulResults()

Get only successful results.
const successful = await job.getSuccessfulResults();

successful.forEach(record => {
  console.log('ID:', record.sf__Id);
  console.log('Created:', record.sf__Created === 'true');
});
Parameters:
raw
boolean
default:false
If true, returns raw CSV string instead of parsed records
Returns: Promise<IngestJobV2SuccessfulResults<S>> or Promise<string>

getFailedResults()

Get only failed results.
const failed = await job.getFailedResults();

failed.forEach(record => {
  console.log('Error:', record.sf__Error);
  console.log('Original data:', record);
});
Parameters:
raw
boolean
default:false
If true, returns raw CSV string
Returns: Promise<IngestJobV2FailedResults<S>> or Promise<string>

getUnprocessedRecords()

Get unprocessed records (usually due to job abortion).
const unprocessed = await job.getUnprocessedRecords();

if (Array.isArray(unprocessed)) {
  console.log('Unprocessed count:', unprocessed.length);
} else {
  console.log('Unprocessed CSV:', unprocessed);
}
Parameters:
raw
boolean
default:false
If true, returns raw CSV string
Returns: Promise<IngestJobV2UnprocessedRecords<S>>

abort()

Abort the job.
await job.abort();
Returns: Promise<void>

delete()

Delete the job from Salesforce.
await job.delete();
Returns: Promise<void>

getInfo()

Get cached job information.
const jobInfo = job.getInfo();
console.log('Cached state:', jobInfo.state);
Returns: JobInfoV2 - Cached job info
Throws error if no info is cached. Call await job.check() first.

QueryJobV2 Class

Represents a Bulk API v2 query job.

Properties

id
string
The query job ID

open()

Create the query job in Salesforce.
const queryJob = new QueryJobV2(conn, {
  bodyParams: {
    query: 'SELECT Id, Name FROM Account',
    operation: 'query'
  },
  pollingOptions: { pollInterval: 1000, pollTimeout: 30000 }
});

await queryJob.open();
Returns: Promise<QueryJobInfoV2>
QueryJobInfoV2
object
id
string
Query job ID
operation
string
Operation type
state
string
Job state
createdDate
string
Creation timestamp
numberRecordsProcessed
number
Records processed

poll()

Wait for query to complete.
await queryJob.poll();
Parameters:
interval
number
Polling interval in milliseconds
timeout
number
Polling timeout in milliseconds
Returns: Promise<void>

result()

Get query results as a record stream.
const resultStream = await queryJob.result();

resultStream.on('record', record => {
  console.log(record);
});
Returns: Promise<Parsable<Record>> - Record stream with query results

check()

Check query job status.
const jobInfo = await queryJob.check();
console.log('State:', jobInfo.state);
Returns: Promise<QueryJobInfoV2>

abort()

Abort the query job.
await queryJob.abort();
Returns: Promise<QueryJobInfoV2>

delete()

Delete the query job.
await queryJob.delete();
Returns: Promise<void>

getInfo()

Get cached query job information.
const jobInfo = queryJob.getInfo();
Returns: QueryJobInfoV2

Events

Both IngestJobV2 and QueryJobV2 extend EventEmitter.

IngestJobV2 Events

const job = conn.bulk2.createJob({
  object: 'Account',
  operation: 'insert'
});

job.on('open', () => {
  console.log('Job opened:', job.id);
});

job.on('close', () => {
  console.log('Job closed and ready for processing');
});

job.on('inProgress', jobInfo => {
  console.log('Processing...', jobInfo.numberRecordsProcessed);
});

job.on('jobComplete', jobInfo => {
  console.log('Job completed!');
});

job.on('aborted', () => {
  console.log('Job aborted');
});

job.on('error', err => {
  console.error('Job error:', err);
});

QueryJobV2 Events

queryJob.on('open', jobInfo => {
  console.log('Query job opened:', jobInfo.id);
});

queryJob.on('inProgress', jobInfo => {
  console.log('Query in progress...');
});

queryJob.on('jobComplete', jobInfo => {
  console.log('Query completed!');
});

queryJob.on('error', err => {
  console.error('Query error:', err);
});

Advanced Usage

Manual Job Control

const job = conn.bulk2.createJob({
  object: 'Contact',
  operation: 'upsert',
  externalIdFieldName: 'Email__c'
});

try {
  // Step 1: Open the job
  await job.open();
  console.log('Job created:', job.id);
  
  // Step 2: Upload data
  await job.uploadData(contacts);
  console.log('Data uploaded');
  
  // Step 3: Close job to start processing
  await job.close();
  console.log('Job closed, processing started');
  
  // Step 4: Poll for completion
  await job.poll();
  console.log('Job completed');
  
  // Step 5: Get results
  const results = await job.getAllResults();
  console.log('Success:', results.successfulResults.length);
  console.log('Failed:', results.failedResults.length);
  
} catch (err) {
  console.error('Job failed:', err);
  
  // Clean up
  try {
    await job.abort();
    await job.delete();
  } catch (cleanupErr) {
    // Ignore cleanup errors
  }
}

Streaming Large Files

const fs = require('fs');
const csvStream = fs.createReadStream('large-dataset.csv');

const job = conn.bulk2.createJob({
  object: 'Account',
  operation: 'insert'
});

await job.open();
await job.uploadData(csvStream);
await job.close();

// Monitor progress
job.on('inProgress', info => {
  const progress = (info.numberRecordsProcessed / info.numberRecordsProcessed) * 100;
  console.log(`Progress: ${progress.toFixed(1)}%`);
});

await job.poll(5000, 600000); // Poll every 5s for up to 10 minutes

const results = await job.getAllResults();

Custom CSV Formatting

const job = conn.bulk2.createJob({
  object: 'Account',
  operation: 'insert',
  columnDelimiter: 'PIPE',  // Use | instead of comma
  lineEnding: 'CRLF'         // Windows line endings
});

const csv = `Name|Industry|NumberOfEmployees
Company A|Tech|100
Company B|Finance|200`;

await job.open();
await job.uploadData(csv);
await job.close();
await job.poll();

Query with All Records

// Include deleted and archived records
const soql = 'SELECT Id, Name, IsDeleted FROM Account';
const recordStream = await conn.bulk2.query(soql, {
  scanAll: true  // Use queryAll operation
});

let deletedCount = 0;
recordStream.on('record', record => {
  if (record.IsDeleted) deletedCount++;
});

recordStream.on('end', () => {
  console.log('Deleted records found:', deletedCount);
});

Retry Failed Records

const results = await conn.bulk2.loadAndWaitForResults({
  object: 'Account',
  operation: 'insert',
  input: records
});

if (results.failedResults.length > 0) {
  console.log('Retrying failed records...');
  
  // Extract failed records and fix issues
  const retryRecords = results.failedResults.map(record => {
    const { sf__Error, sf__Id, ...originalData } = record;
    
    // Fix common issues
    if (sf__Error.includes('REQUIRED_FIELD_MISSING')) {
      originalData.RequiredField = 'Default Value';
    }
    
    return originalData;
  });
  
  // Retry
  const retryResults = await conn.bulk2.loadAndWaitForResults({
    object: 'Account',
    operation: 'insert',
    input: retryRecords
  });
  
  console.log('Retry success:', retryResults.successfulResults.length);
}

Error Handling

try {
  const results = await conn.bulk2.loadAndWaitForResults({
    object: 'Account',
    operation: 'insert',
    input: records
  });
  
  // Check for failures
  if (results.failedResults.length > 0) {
    console.log('Some records failed:');
    results.failedResults.forEach(record => {
      console.log(`  Error: ${record.sf__Error}`);
      console.log(`  Data: ${JSON.stringify(record)}`);
    });
  }
  
} catch (err) {
  if (err.name === 'JobPollingTimeout') {
    console.log('Job timed out but may still be processing');
    console.log('Job ID:', err.jobId);
    
    // Reconnect to job later
    const job = conn.bulk2.job('ingest', { id: err.jobId });
    const info = await job.check();
    console.log('Current state:', info.state);
    
  } else {
    console.error('Bulk operation failed:', err.message);
  }
}

Limits and Considerations

Bulk API v2 Limits:
  • Maximum 150 MB data upload per job
  • Maximum 150,000,000 characters per job
  • Records must not exceed 400KB each
  • Maximum 24 hours processing time
  • Jobs are automatically deleted after 7 days
  • Query results available for 7 days

Comparing v1 and v2

FeatureBulk v1Bulk v2
Batch ManagementManual (multiple batches)Automatic (single upload)
Data UploadMultiple batch uploadsSingle upload
Job ClosureManual close requiredManual close required
Result RetrievalSingle result callSeparate success/failure/unprocessed endpoints
Error DetailsLimitedDetailed with original data
PerformanceGoodBetter (optimized engine)
Max File Size10 MB per batch150 MB per job
API ComplexityMore complexSimpler workflow

When to Use v1

  • Need batch-level control
  • Processing extremely large datasets that exceed v2 limits
  • Using serial concurrency mode for record locking
  • Working with legacy systems

When to Use v2

  • Most modern use cases (recommended)
  • Need better error handling
  • Want simpler integration
  • Processing up to 150 MB per job
  • Need detailed success/failure reporting

See Also

Build docs developers (and LLMs) love