Skip to main content
The S3 class provides a high-level interface for interacting with AWS S3 storage in Metaflow. It handles downloads, uploads, and listings of S3 objects with automatic retry logic and parallel operations.

Overview

The S3 client manages connections to S3 and temporary directories for downloaded objects. It supports three initialization modes:
  1. Run-based: Use S3(run=self) to automatically prefix paths with the current run ID
  2. Explicit prefix: Use S3(s3root='s3://mybucket/path') to set a custom S3 prefix
  3. Full URLs: Use S3() with complete S3 URLs for each operation

Usage

The recommended way to use the S3 client is as a context manager:
from metaflow import S3

with S3() as s3:
    data = [obj.blob for obj in s3.get_many(urls)]
print(data)
The context manager automatically creates and cleans up temporary directories. Without a context manager, call .close() explicitly:
s3 = S3()
data = [obj.blob for obj in s3.get_many(urls)]
s3.close()

Constructor

S3

S3(
    tmproot: str = TEMPDIR,
    bucket: Optional[str] = None,
    prefix: Optional[str] = None,
    run: Optional[Union[FlowSpec, Run]] = None,
    s3root: Optional[str] = None,
    encryption: Optional[str] = S3_SERVER_SIDE_ENCRYPTION,
    **kwargs
)

Parameters

tmproot
str
default:"TEMPDIR"
Directory for storing temporary files during downloads.
bucket
str
Override the bucket from DATATOOLS_S3ROOT when run is specified.
prefix
str
Override the path from DATATOOLS_S3ROOT when run is specified.
run
FlowSpec | Run
Derive path prefix from the current or a past run ID. Use S3(run=self) inside a flow.
s3root
str
S3 prefix to use if run is not specified. Must start with s3://.
encryption
str
Server-side encryption to use when uploading objects to S3.

Methods

get

Download a single object from S3.
get(
    key: Optional[Union[str, S3GetObject]] = None,
    return_missing: bool = False,
    return_info: bool = True
) -> S3Object
Parameters:
  • key: Object to download (S3 URL, path suffix, or S3GetObject for range downloads)
  • return_missing: If True, return S3Object with .exists == False instead of raising exception
  • return_info: If True, fetch content-type and user metadata
Returns: S3Object with downloaded content

get_many

Download many objects from S3 in parallel.
get_many(
    keys: Iterable[Union[str, S3GetObject]],
    return_missing: bool = False,
    return_info: bool = True
) -> List[S3Object]
Parameters:
  • keys: Objects to download (S3 URLs, path suffixes, or S3GetObject instances)
  • return_missing: If True, include missing objects with .exists == False
  • return_info: If True, fetch metadata for each object
Returns: List of S3Object instances

get_recursive

Download all objects under given prefixes recursively in parallel.
get_recursive(
    keys: Iterable[str],
    return_info: bool = False
) -> List[S3Object]
Parameters:
  • keys: Prefixes to download recursively
  • return_info: If True, fetch metadata for each object
Returns: List of S3Object instances for all objects under the prefixes

get_all

Download all objects under the prefix set in the constructor.
get_all(return_info: bool = False) -> List[S3Object]
Requires that the S3 object was initialized with run or s3root.

put

Upload a single object to S3.
put(
    key: Union[str, S3PutObject],
    obj: PutValue,
    overwrite: bool = True,
    content_type: Optional[str] = None,
    metadata: Optional[Dict[str, str]] = None
) -> str
Parameters:
  • key: Object path (S3 URL or path suffix)
  • obj: String, bytes, or file-like object to upload
  • overwrite: If False, skip upload if key already exists
  • content_type: MIME type for the object
  • metadata: JSON-encodable dictionary of metadata
Returns: URL of the uploaded object

put_many

Upload many objects to S3 in parallel.
put_many(
    key_objs: List[Union[Tuple[str, PutValue], S3PutObject]],
    overwrite: bool = True
) -> List[Tuple[str, str]]
Parameters:
  • key_objs: List of (key, obj) tuples or S3PutObject instances
  • overwrite: If False, skip uploads for existing keys
Returns: List of (key, url) pairs for uploaded objects

put_files

Upload many local files to S3 in parallel.
put_files(
    key_paths: List[Union[Tuple[str, str], S3PutObject]],
    overwrite: bool = True
) -> List[Tuple[str, str]]
Parameters:
  • key_paths: List of (key, local_path) tuples or S3PutObject instances
  • overwrite: If False, skip uploads for existing keys
Returns: List of (key, url) pairs for uploaded files

info

Get metadata about a single object without downloading it.
info(
    key: Optional[str] = None,
    return_missing: bool = False
) -> S3Object
Makes a HEAD request to S3, which is faster than downloading.

info_many

Get metadata about many objects in parallel without downloading them.
info_many(
    keys: Iterable[str],
    return_missing: bool = False
) -> List[S3Object]

list_paths

List the next level of paths in S3 (non-recursive).
list_paths(keys: Optional[Iterable[str]] = None) -> List[S3Object]
Returns both files and prefixes (directories). Prefixes have .exists == False.

list_recursive

List all objects recursively under given prefixes.
list_recursive(keys: Optional[Iterable[str]] = None) -> List[S3Object]
Returns only leaf objects (all have .exists == True).

close

Delete all temporary files downloaded in this context.
close()

S3Object

An S3Object represents a path or object in S3. It is returned by S3 client methods and provides access to both the S3 location and downloaded content.

Properties

exists (bool): True if the key corresponds to an existing S3 object downloaded (bool): True if the object has been downloaded url (str): S3 location of the object key (str): Key used in the request that produced this object path (Optional[str]): Local path to downloaded file (None if not downloaded) blob (Optional[bytes]): Contents as bytes (None if not downloaded) text (Optional[str]): Contents as UTF-8 string (None if not downloaded) size (Optional[int]): Size in bytes (None if object doesn’t exist) content_type (Optional[str]): MIME type of the object metadata (Optional[Dict]): User-defined metadata dictionary encryption (Optional[str]): Server-side encryption type range_info (Optional[RangeInfo]): Information about partial downloads last_modified (Optional[int]): Unix timestamp of last modification

Helper Classes

S3GetObject

Specifies a range download request:
S3GetObject(key: str, offset: int = 0, length: int = -1)
  • key: S3 path
  • offset: Starting byte offset
  • length: Number of bytes to download (negative for “from offset to end”)

S3PutObject

Specifies an upload with metadata:
S3PutObject(
    key: str,
    value: Optional[PutValue] = None,
    path: Optional[str] = None,
    content_type: Optional[str] = None,
    encryption: Optional[str] = None,
    metadata: Optional[Dict[str, str]] = None
)

Examples

Download files in a flow

from metaflow import FlowSpec, step, S3

class MyFlow(FlowSpec):
    @step
    def start(self):
        with S3(run=self) as s3:
            # Upload data
            s3.put('data.json', '{"key": "value"}')
            
            # Download it back
            obj = s3.get('data.json')
            print(obj.text)
        
        self.next(self.end)
    
    @step
    def end(self):
        pass

Download multiple files

with S3() as s3:
    urls = [
        's3://mybucket/file1.txt',
        's3://mybucket/file2.txt',
        's3://mybucket/file3.txt'
    ]
    objects = s3.get_many(urls)
    
    for obj in objects:
        if obj.exists:
            print(f"{obj.url}: {len(obj.blob)} bytes")

Partial downloads

from metaflow import S3, S3GetObject

with S3() as s3:
    # Download first 1000 bytes
    obj = s3.get(S3GetObject('s3://mybucket/large-file.bin', offset=0, length=1000))
    print(f"Downloaded {obj.size} bytes")
    print(f"Total file size: {obj.range_info.total_size} bytes")

Upload with metadata

from metaflow import S3, S3PutObject

with S3(s3root='s3://mybucket/data') as s3:
    objects = [
        S3PutObject(
            key='report.html',
            value='<html>...</html>',
            content_type='text/html',
            metadata={'author': 'workflow', 'version': '1.0'}
        )
    ]
    results = s3.put_many(objects)

Build docs developers (and LLMs) love