I needed to periodically backup content on an S3 bucket into a large archive (on another S3 bucket). The bucket is too huge to copy it on the pod that is going to create the archive, thus my goal was to build the archive on the fly and continuously push it. I did not find a way to do it from the command line thus used a small Node.js script, allowing to review things about streams.

Process

The process I intended to use:

  1. list keys in the bucket to include, as a readable object stream whose items are S3 objects
  2. fetch objects, as a transform object stream
  3. add the content of the object into a zipped binary stream
  4. split the big stream in smaller streams in order to upload that as S3 parts, as a transform stream that is binary in input and emits readable streams
  5. start a multipart upload, upload parts, complete multipart upload as a writable stream

I had to change steps 4 and 5 because it is not possible to upload S3 parts without giving their size in advance. It would be possible to predict the size of intermediate parts, but not of the last one. Instead I had the transform stream create a file for each part, append to the file until it had the maximum size I wanted for a part, and then emit file name. Last step deletes the file after it is uploaded. Even though I did not implement it, having files still have one advantage: each part upload can be retried on error. The final process is thus:

  1. list keys in the bucket to include, as a readable object stream whose items are S3 objects
  2. fetch objects, as a transform object stream
  3. add the content of the object into a zipped binary stream
  4. split the big stream in files, as a transform stream that is binary in input and emits file paths
  5. start a multipart upload, upload parts and delete files, complete multipart upload as a writable stream

List keys

It is a readable stream: no one will push stuff into it, but the application will suck objects out of it. It works by fetching a batch of keys every time its buffer usage is low, and saving continuation token for the next batch. The objects it emits are rather small, containing only the object description and not its content, thus in order to decrease the number of calls made to S3 API this stream will have a huge highWaterMark. Its _read method is marked async and since streams do not handle the resulting promise it is wrapped in a big try/catch that will destroy the stream on error.

For S3 interaction I used the official client: @aws-sdk/client-s3.

import {Readable} from 'node:stream';
import {ListObjectsV2Command} from "@aws-sdk/client-s3";

class BucketObjectsList extends Readable {
  #client;
  #bucket;
  #continuationToken;

  constructor(options) {
    super({...options, objectMode: true});
    this.#client = options.client;
    this.#bucket = options.bucket;
  }

  async _read(n) {
    try {
      const list = await this.#client.send(new ListObjectsV2Command({
        Bucket: this.#bucket,
        ContinuationToken: this.#continuationToken,
        MaxKeys: n,
      }));
      this.#continuationToken = list.NextContinuationToken;

      const expectsMore = list.Contents.map((object) => {
        return this.push(object);
      }).every(status => !!status);

      if (!list.NextContinuationToken) { // limit for testing: comment
        this.push(null);
      }
      if (expectsMore) {
        await this._read(n);
      }
    } catch (err) {
      this.destroy(err);
    }
  }
}

Fetch objects

This a transform stream: for each key from the list it will fetch the object and emit the object. Object content is not actually fetched, in the object there is actually a readable stream ready to get the content. Doing that operation in a stream means there will be a buffer with a number of objects ready to be read. Objects need to be handled one by one in the archive and there is no need to have tens of objects ready, hence this stream’s highWaterMark will be small.

import {Transform} from 'node:stream';
import {GetObjectCommand} from "@aws-sdk/client-s3";

class FetchObject extends Transform {
  #client;
  #bucket;

  constructor(options) {
    super({...options, objectMode: true});
    this.#client = options.client;
    this.#bucket = options.bucket;
  }

  async _transform(data, encoding, callback) {
    try {
      const object = await this.#client.send(new GetObjectCommand({
        Bucket: this.#bucket,
        Key: data.Key,
      }));
      this.push({
        Key: data.Key,
        Body: object.Body,
      });
      callback();
    } catch (err) {
      this.destroy(err);
    }
  }
}

Build the archive

For building the archive I used zip-stream. This one requires a specific method to be called for every file, sequentially after the previous file is completely added. In order for adds to be sequential, the upstream stream is paused each time it emits an object, and resumed once the object is fully ingested.

import ZipStream from 'zip-stream';

const zip = new ZipStream();
// objects is the previous stream emitting files
objects.on('data', async (object) => {
  objects.pause();
  console.log(`Handling ${object.Key}`);
  zip.entry(object.Body, {name: object.Key}, (err) => {
    if (err) {
      zip.destroy(err);
    } else {
      objects.resume();
    }
  });
});

Split the archive in files

Another transform stream that this times receives chunks of bytes, and emits objects that describe the file just assembled. The stream works by tracking how much data was added. I did not split on exact size limit, and always push the first chunk of each file without checking size limit). In case file stream buffer if filled, _transform method waits for the drain event before pushing more data. Before emitting a file, it must wait for the data to be actually written. At the end _flush method emits the last file that was being built.

import {Transform} from 'node:stream';
import {createWriteStream} from 'node:fs';

class SplitInFiles extends Transform {
  #maxSizeBytes;

  #currentBytes;
  #currentStream;
  #currentFile;
  #currentPartNumber;

  #nextPartNumber;

  constructor(options) {
    super({...options, objectMode: true});
    this.#maxSizeBytes = options.maxSizeBytes;
    this.#nextPartNumber = 1;
  }

  async _transform(data, encoding, callback) {
    try {
      if (!this.#currentStream) {
        this.#initStream();
      } else if (this.#currentBytes + data.length > this.#maxSizeBytes) {
        await this.#pushStream();
        this.#initStream();
      }
      this.#currentBytes += data.length;
      if (!this.#currentStream.write(data)) {
        this.#currentStream.once('drain', callback);
      } else {
        callback();
      }
    } catch (err) {
      callback(err);
    }
  }

  async _flush(callback) {
    try {
      if (this.#currentStream) {
        await this.#pushStream();
      }
      callback();
    } catch (err) {
      callback(err);
    }
  }

  async #pushStream() {
    await new Promise((resolve, reject) => {
      this.#currentStream.on('error', (err) => {
        reject(err);
      });
      this.#currentStream.on('finish', () => {
        resolve();
      });
      this.#currentStream.end();
    });
    this.push({file: this.#currentFile, sizeBytes: this.#currentBytes, partNumber: this.#currentPartNumber});
  }

  #initStream() {
    this.#currentPartNumber = this.#nextPartNumber;
    this.#nextPartNumber += 1;
    this.#currentFile = `part_${this.#currentPartNumber}`;
    this.#currentStream = createWriteStream(this.#currentFile);
    this.#currentBytes = 0;
  }
}

Multipart upload

This is the last stream: a writable one that will process every file until processing is done.

A multipart upload is initiated at the beginning in _construct.

Then each file is uploaded separately, part number being given by the previous stream. There is maybe a bit of optimization to be done here because each part is fully uploaded before the next one is started: next part upload could be initiated while the previous is almost finished. Once all parts are fully processed the stream will complete upload in _final.

In case of error, _destroy aborts the upload. In any case to avoid running costs I have a lifecycle configuration on the bucket to automatically abort multipart uploads after one week.

This stream will have a very low highWaterMark because if upload is slow, we don’t want to pile up temporary files.

import {Writable} from 'node:stream';
import {createReadStream} from 'node:fs';
import {unlink} from 'node:fs/promises';

import {CreateMultipartUploadCommand, CompleteMultipartUploadCommand, AbortMultipartUploadCommand, UploadPartCommand} from "@aws-sdk/client-s3";

class UploadFilesToS3AsMultipart extends Writable {
  #client;
  #bucket;
  #key;
  #parts;
  #uploadId;

  constructor(options) {
    super({...options, objectMode: true});
    this.#client = options.client;
    this.#bucket = options.bucket;
    this.#key = options.key;
    this.#parts = [];
  }

  async _construct(callback) {
    try {
      console.log(`Starting upload to ${this.#key}`);
      const multipart = await this.#client.send(new CreateMultipartUploadCommand({
        Bucket: this.#bucket,
        Key: this.#key,
      }));
      this.#uploadId = multipart.UploadId;
      callback();
    } catch (err) {
      callback(err);
    }
  }

  async _write(part, encoding, callback) {
    try {
      console.log(`Uploading part ${part.partNumber} (size: ${part.sizeBytes} bytes)`);
      const completedPart = await this.#client.send(new UploadPartCommand({
        Bucket: this.#bucket,
        Key: this.#key,
        UploadId: this.#uploadId,
        PartNumber: part.partNumber,
        Body: createReadStream(part.file),
      }));
      await unlink(part.file);
      this.#parts.push({
        PartNumber: part.partNumber,
        ETag: completedPart.ETag,
      });
      callback();
    } catch (err) {
      callback(err);
    }
  }

  async _final(callback) {
    try {
      await this.#client.send(new CompleteMultipartUploadCommand({
        Bucket: this.#bucket,
        Key: this.#key,
        UploadId: this.#uploadId,
        MultipartUpload: {
          Parts: this.#parts,
        },
      }));
      console.log(`Completed upload to ${this.#key}`);
      callback();
    } catch (err) {
      callback(err);
    }
  }

  async _destroy(err, callback) {
    try {
      if (err) {
        console.log(`Aborting upload to ${this.#key}`);
        await this.#client.send(new AbortMultipartUploadCommand({
          Bucket: this.#bucket,
          Key: this.#key,
          UploadId: this.#uploadId,
        }));
      }
    } catch (newErr) {
      console.log('Error while aborting upload', newErr);
    } finally {
      callback(err);
    }
  }
}

Wiring

Now that all pieces are available, time to write the glue code that pipes all streams together. One thing I was surprised with is that streams do not propagate errors when they are piped. I made every step destroy the next stream on error, having to expect errors only on the final one.

(async function main() {
  try {
    const client = new S3Client({});

    const list = new BucketObjectsList({client, bucket: process.env.AWS_S3_SOURCE_BUCKET, highWaterMark: 1000});

    const objects = list.pipe(new FetchObject({client, bucket: process.env.AWS_S3_SOURCE_BUCKET, highWaterMark: 5}));
    list.on('error', (err) => {
      objects.destroy(err);
    });

    const zip = new ZipStream();
    objects.on('data', async (object) => {
      objects.pause();
      console.log(`Handling ${object.Key}`);
      zip.entry(object.Body, {name: object.Key}, (err) => {
        if (err) {
          zip.destroy(err);
        } else {
          objects.resume();
        }
      });
    });
    objects.on('end', () => {
      zip.finalize();
    });
    objects.on('error', (err) => {
      zip.destroy(err);
    });

    const parts = new SplitInFiles({maxSizeBytes: 1024 * 1024 * 1024});
    zip.pipe(parts);
    zip.on('error', (err) => {
      parts.destroy(err);
    });

    const key = 'xyz';
    const upload = new UploadFilesToS3AsMultipart({client, bucket: process.env.AWS_S3_BACKUP_BUCKET, key, highWaterMark: 2});
    parts.pipe(upload);
    parts.on('error', (err) => {
      upload.destroy(err);
    });

    await new Promise((resolve, reject) => {
      upload.on('error', (err) => {
        reject(err);
      });
      upload.on('finish', () => {
        resolve();
      })
    });
  } catch (err) {
    console.log(err);
    process.exit(1);
  }
})();