Streaming Compression
The streaming API usesZSTD_compressStream2() to compress data incrementally.
Create buffers and context
Allocate input/output buffers and create a compression context:The
size_t const buffInSize = ZSTD_CStreamInSize(); // Recommended input size
void* const buffIn = malloc_orDie(buffInSize);
size_t const buffOutSize = ZSTD_CStreamOutSize(); // Recommended output size
void* const buffOut = malloc_orDie(buffOutSize);
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
CHECK(cctx != NULL, "ZSTD_createCCtx() failed!");
ZSTD_CStreamInSize() and ZSTD_CStreamOutSize() functions return recommended buffer sizes for optimal performance.Set compression parameters
Configure the compression level and other parameters:
CHECK_ZSTD(ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, cLevel));
CHECK_ZSTD(ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1));
Compress in a loop
Read chunks of data and compress them:
size_t const toRead = buffInSize;
for (;;) {
size_t read = fread_orDie(buffIn, toRead, fin);
/* Select the flush mode.
* If the read may not be finished (read == toRead) we use
* ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
* Zstd optimizes the case where the first flush mode is ZSTD_e_end,
* since it knows it is compressing the entire source in one pass.
*/
int const lastChunk = (read < toRead);
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
ZSTD_inBuffer input = { buffIn, read, 0 };
int finished;
do {
ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
size_t const remaining = ZSTD_compressStream2(cctx, &output, &input, mode);
CHECK_ZSTD(remaining);
fwrite_orDie(buffOut, output.pos, fout);
finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
} while (!finished);
CHECK(input.pos == input.size, "Input not fully consumed!");
if (lastChunk) {
break;
}
}
Complete Example
Fromexamples/streaming_compression.c:
static void compressFile_orDie(const char* fname, const char* outName, int cLevel,
int nbThreads)
{
/* Open the input and output files. */
FILE* const fin = fopen_orDie(fname, "rb");
FILE* const fout = fopen_orDie(outName, "wb");
/* Create the input and output buffers.
* They may be any size, but we recommend using these functions to size them.
* Performance will only suffer significantly for very tiny buffers.
*/
size_t const buffInSize = ZSTD_CStreamInSize();
void* const buffIn = malloc_orDie(buffInSize);
size_t const buffOutSize = ZSTD_CStreamOutSize();
void* const buffOut = malloc_orDie(buffOutSize);
/* Create the context. */
ZSTD_CCtx* const cctx = ZSTD_createCCtx();
CHECK(cctx != NULL, "ZSTD_createCCtx() failed!");
/* Set any parameters you want.
* Here we set the compression level, and enable the checksum.
*/
CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, cLevel) );
CHECK_ZSTD( ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1) );
/* This loop read from the input file, compresses that entire chunk,
* and writes all output produced to the output file.
*/
size_t const toRead = buffInSize;
for (;;) {
size_t read = fread_orDie(buffIn, toRead, fin);
/* Select the flush mode.
* If the read may not be finished (read == toRead) we use
* ZSTD_e_continue. If this is the last chunk, we use ZSTD_e_end.
* Zstd optimizes the case where the first flush mode is ZSTD_e_end,
* since it knows it is compressing the entire source in one pass.
*/
int const lastChunk = (read < toRead);
ZSTD_EndDirective const mode = lastChunk ? ZSTD_e_end : ZSTD_e_continue;
/* Set the input buffer to what we just read.
* We compress until the input buffer is empty, each time flushing the
* output.
*/
ZSTD_inBuffer input = { buffIn, read, 0 };
int finished;
do {
/* Compress into the output buffer and write all of the output to
* the file so we can reuse the buffer next iteration.
*/
ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
size_t const remaining = ZSTD_compressStream2(cctx, &output , &input, mode);
CHECK_ZSTD(remaining);
fwrite_orDie(buffOut, output.pos, fout);
/* If we're on the last chunk we're finished when zstd returns 0,
* which means its consumed all the input AND finished the frame.
* Otherwise, we're finished when we've consumed all the input.
*/
finished = lastChunk ? (remaining == 0) : (input.pos == input.size);
} while (!finished);
CHECK(input.pos == input.size,
"Impossible: zstd only returns 0 when the input is completely consumed!");
if (lastChunk) {
break;
}
}
ZSTD_freeCCtx(cctx);
fclose_orDie(fout);
fclose_orDie(fin);
free(buffIn);
free(buffOut);
}
Streaming Decompression
Decompression follows a similar pattern usingZSTD_decompressStream().
Create buffers and context
size_t const buffInSize = ZSTD_DStreamInSize();
void* const buffIn = malloc_orDie(buffInSize);
size_t const buffOutSize = ZSTD_DStreamOutSize();
void* const buffOut = malloc_orDie(buffOutSize);
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
CHECK(dctx != NULL, "ZSTD_createDCtx() failed!");
Decompress in a loop
size_t const toRead = buffInSize;
size_t read;
size_t lastRet = 0;
while ((read = fread_orDie(buffIn, toRead, fin))) {
ZSTD_inBuffer input = { buffIn, read, 0 };
while (input.pos < input.size) {
ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
/* The return code is zero if the frame is complete, but there may
* be multiple frames concatenated together. Zstd will automatically
* reset the context when a frame is complete.
*/
size_t const ret = ZSTD_decompressStream(dctx, &output, &input);
CHECK_ZSTD(ret);
fwrite_orDie(buffOut, output.pos, fout);
lastRet = ret;
}
}
Complete Example
Fromexamples/streaming_decompression.c:
static void decompressFile_orDie(const char* fname)
{
FILE* const fin = fopen_orDie(fname, "rb");
size_t const buffInSize = ZSTD_DStreamInSize();
void* const buffIn = malloc_orDie(buffInSize);
FILE* const fout = stdout;
size_t const buffOutSize = ZSTD_DStreamOutSize();
void* const buffOut = malloc_orDie(buffOutSize);
ZSTD_DCtx* const dctx = ZSTD_createDCtx();
CHECK(dctx != NULL, "ZSTD_createDCtx() failed!");
/* This loop assumes that the input file is one or more concatenated zstd
* streams. This example won't work if there is trailing non-zstd data at
* the end, but streaming decompression in general handles this case.
* ZSTD_decompressStream() returns 0 exactly when the frame is completed,
* and doesn't consume input after the frame.
*/
size_t const toRead = buffInSize;
size_t read;
size_t lastRet = 0;
int isEmpty = 1;
while ( (read = fread_orDie(buffIn, toRead, fin)) ) {
isEmpty = 0;
ZSTD_inBuffer input = { buffIn, read, 0 };
/* Given a valid frame, zstd won't consume the last byte of the frame
* until it has flushed all of the decompressed data of the frame.
* Therefore, instead of checking if the return code is 0, we can
* decompress just check if input.pos < input.size.
*/
while (input.pos < input.size) {
ZSTD_outBuffer output = { buffOut, buffOutSize, 0 };
/* The return code is zero if the frame is complete, but there may
* be multiple frames concatenated together. Zstd will automatically
* reset the context when a frame is complete. Still, calling
* ZSTD_DCtx_reset() can be useful to reset the context to a clean
* state, for instance if the last decompression call returned an
* error.
*/
size_t const ret = ZSTD_decompressStream(dctx, &output , &input);
CHECK_ZSTD(ret);
fwrite_orDie(buffOut, output.pos, fout);
lastRet = ret;
}
}
if (isEmpty) {
fprintf(stderr, "input is empty\n");
exit(1);
}
if (lastRet != 0) {
/* The last return value from ZSTD_decompressStream did not end on a
* frame, but we reached the end of the file! We assume this is an
* error, and the input was truncated.
*/
fprintf(stderr, "EOF before end of stream: %zu\n", lastRet);
exit(1);
}
ZSTD_freeDCtx(dctx);
fclose_orDie(fin);
fclose_orDie(fout);
free(buffIn);
free(buffOut);
}
Flush Modes
TheZSTD_EndDirective parameter controls how compression behaves:
// Continue compressing, more data may follow
ZSTD_EndDirective mode = ZSTD_e_continue;
ZSTD_compressStream2(cctx, &output, &input, mode);
Buffer Management
TheZSTD_inBuffer and ZSTD_outBuffer structures track buffer positions:
typedef struct {
void* buf; // Pointer to buffer
size_t size; // Size of buffer
size_t pos; // Current position (updated by zstd)
} ZSTD_inBuffer;
typedef struct {
void* buf;
size_t size;
size_t pos;
} ZSTD_outBuffer;
pos field to indicate how much data has been consumed (input) or produced (output).