A Task represents an execution of a Step.
It contains all DataArtifact objects produced by the task as well as metadata related to execution.
Note that the @retry decorator may cause multiple attempts of the task to be present. Usually you want the latest attempt, which is what instantiating a Task object returns by default. If you need to e.g. retrieve logs from a failed attempt, you can explicitly get information about a specific attempt by using Task('flow/run/step/task', attempt=<attempt>) where attempt=0 corresponds to the first attempt.
Constructor
Task(pathspec: Optional[str] = None,
attempt: Optional[int] = None)
Parameters
Path to the task (e.g., ‘FlowName/RunID/StepName/TaskID’).
Specific attempt number to access. If not specified, uses the latest attempt.
Properties
The task ID.
data
Returns a container of data artifacts produced by this task.
You can access data produced by this task as follows:
Returns: Container of all artifacts produced by this task.
artifacts
artifacts: MetaflowArtifacts
Returns a container of DataArtifact objects produced by this task.
You can access each DataArtifact by name like so:
print(task.artifacts.my_var)
This method differs from data because it returns DataArtifact objects (which contain additional metadata) as opposed to just the data.
Returns: Container of all DataArtifacts produced by this task.
Metadata events produced by this task across all attempts of the task except if you selected a specific task attempt.
Note that Metadata is different from tags.
Returns: List of Metadata produced by this task.
metadata_dict: Dict[str, str]
Dictionary mapping metadata names (keys) and their associated values.
Note that unlike the metadata property, this call will only return the latest metadata for a given name. For example, if a task executes multiple times (retries), the same metadata name will be generated multiple times (one for each execution of the task). The metadata property returns all those metadata elements whereas this call will return the metadata associated with the latest execution of the task.
Returns: Dictionary mapping metadata name with value.
successful
Indicates whether or not the task completed successfully.
This information is always about the latest task to have completed (in case of retries).
Returns: True if the task completed successfully and False otherwise.
finished
Indicates whether or not the task completed.
This information is always about the latest task to have completed (in case of retries).
Returns: True if the task completed and False otherwise.
exception
Returns the exception that caused the task to fail, if any.
This information is always about the latest task to have completed (in case of retries). If successful() returns False and finished() returns True, this method can help determine what went wrong.
Returns: Exception raised by the task or None if not applicable.
finished_at
finished_at: Optional[datetime]
Returns the datetime object of when the task finished (successfully or not).
This information is always about the latest task to have completed (in case of retries). This call will return None if the task is not finished.
Returns: Datetime of when the task finished.
runtime_name
runtime_name: Optional[str]
Returns the name of the runtime this task executed on.
Returns: Name of the runtime this task executed on.
stdout
Returns the full standard output of this task.
If you specify a specific attempt for this task, it will return the standard output for that attempt. If you do not specify an attempt, this will return the current standard output for the latest started attempt of the task. In both cases, multiple calls to this property will return the most up-to-date log (so if an attempt is not done, each call will fetch the latest log).
Returns: Standard output of this task.
stdout_size
Returns the size of the stdout log of this task.
Similar to stdout, the size returned is the latest size of the log (so for a running attempt, this value will increase as the task produces more output).
Returns: Size of the stdout log content (in bytes).
stderr
Returns the full standard error of this task.
If you specify a specific attempt for this task, it will return the standard error for that attempt. If you do not specify an attempt, this will return the current standard error for the latest started attempt. In both cases, multiple calls to this property will return the most up-to-date log (so if an attempt is not done, each call will fetch the latest log).
Returns: Standard error of this task.
stderr_size
Returns the size of the stderr log of this task.
Similar to stderr, the size returned is the latest size of the log (so for a running attempt, this value will increase as the task produces more output).
Returns: Size of the stderr log content (in bytes).
current_attempt
Get the relevant attempt for this Task.
Returns the specific attempt used when initializing the instance, or the latest started attempt for the Task.
Returns: Attempt id for this task object.
code
code: Optional[MetaflowCode]
Returns the MetaflowCode object for this task, if present.
Not all tasks save their code so this call may return None in those cases.
Returns: Code package for this task.
environment_info
environment_info: Dict[str, Any]
Returns information about the environment that was used to execute this task.
As an example, if the Conda environment is selected, this will return information about the dependencies that were used in the environment.
This environment information is only available for tasks that have a code package.
Returns: Dictionary describing the environment.
index
Returns the index of the innermost foreach loop if this task is run inside at least one foreach.
The index is what distinguishes the various tasks inside a given step. This call returns None if this task was not run in a foreach loop.
Returns: Index in the innermost loop for this task.
parent_tasks
parent_tasks: Iterator[Task]
Yields all parent tasks of the current task if one exists.
Yields: Parent Task objects of the current task.
child_tasks
child_tasks: Iterator[Task]
Yields all child tasks of the current task if one exists.
Yields: Child Task objects of the current task.
parent_task_pathspecs
parent_task_pathspecs: Iterator[str]
Yields pathspecs of all parent tasks of the current task.
Yields: Pathspec of the parent task of the current task.
child_task_pathspecs
child_task_pathspecs: Iterator[str]
Yields pathspecs of all child tasks of the current task.
Yields: Pathspec of the child task of the current task.
Tags associated with the run this object belongs to (user and system tags).
user_tags: FrozenSet[str]
User tags associated with the run this object belongs to.
system_tags: FrozenSet[str]
System tags associated with the run this object belongs to.
created_at
Date and time this object was first created.
parent
parent: Optional[MetaflowObject]
Returns the parent object (Step) of this task.
pathspec
Pathspec of this object (e.g., ‘FlowName/RunID/StepName/TaskID’).
path_components
path_components: List[str]
Components of the pathspec.
origin_pathspec
origin_pathspec: Optional[str]
Pathspec of the original object this object was cloned from (in the case of a resume).
Returns None if not applicable.
Methods
loglines()
loglines(stream: str,
as_unicode: bool = True,
meta_dict: Optional[Dict[str, Any]] = None) -> Iterator[Tuple[datetime, str]]
Return an iterator over (utc_timestamp, logline) tuples.
Parameters:
stream (str): Either ‘stdout’ or ‘stderr’
as_unicode (bool): If False, each logline is returned as a byte object. Otherwise, it is returned as a (unicode) string. Default: True
meta_dict (Dict[str, Any], optional): Optional metadata dictionary
Yields: Tuple of timestamp, logline pairs.
__iter__()
__iter__() -> Iterator[DataArtifact]
Iterate over all children DataArtifact of this Task.
Yields: DataArtifact objects in this Task.
__getitem__()
__getitem__(name: str) -> DataArtifact
Returns the DataArtifact object with the artifact name ‘name’.
Parameters:
name (str): Data artifact name
Returns: DataArtifact for this artifact name in this task.
Raises: KeyError if the name does not identify a valid DataArtifact object.
__contains__()
__contains__(id: str) -> bool
Tests whether a child named ‘id’ exists.
Parameters:
id (str): Name of the child object
Returns: True if the child exists or False otherwise.
is_in_namespace()
is_in_namespace() -> bool
Returns whether this object is in the current namespace.
Returns: Whether or not the object is in the current namespace.
Usage Examples
Access task data
from metaflow import Task
task = Task('MyFlow/123/start/456')
if task.successful:
print(f"Task finished at: {task.finished_at}")
print(f"Result: {task.data.result}")
print(f"Model accuracy: {task.data.accuracy}")
else:
if task.exception:
print(f"Task failed with: {task.exception}")
Iterate over artifacts
from metaflow import Task
task = Task('MyFlow/123/process/789')
for artifact in task:
print(f"Artifact: {artifact.id}")
print(f" Size: {artifact.size} bytes")
print(f" SHA: {artifact.sha}")
Access specific artifact
from metaflow import Task
task = Task('MyFlow/123/train/101')
model_artifact = task['model']
print(f"Model SHA: {model_artifact.sha}")
model = model_artifact.data
View task logs
from metaflow import Task
task = Task('MyFlow/123/train/101')
# Print stdout
print("STDOUT:")
print(task.stdout)
# Print stderr
print("STDERR:")
print(task.stderr)
# Iterate over log lines with timestamps
for timestamp, line in task.loglines('stdout'):
print(f"[{timestamp}] {line}")
Access specific attempt
from metaflow import Task
# Access first attempt
task_attempt_0 = Task('MyFlow/123/train/101', attempt=0)
print(f"First attempt stdout: {task_attempt_0.stdout}")
# Access latest attempt (default)
task_latest = Task('MyFlow/123/train/101')
print(f"Latest attempt: {task_latest.current_attempt}")
Navigate task graph
from metaflow import Task
task = Task('MyFlow/123/join/202')
print("Parent tasks:")
for parent in task.parent_tasks:
print(f" {parent.pathspec}")
print(f" Index: {parent.index}")
print("Child tasks:")
for child in task.child_tasks:
print(f" {child.pathspec}")
Access code and environment
from metaflow import Task
task = Task('MyFlow/123/train/101')
if task.code:
print(f"Code package: {task.code.path}")
print(f"Script: {task.code.script_name}")
if task.environment_info:
print(f"Environment: {task.environment_info}")
print(f"Runtime: {task.runtime_name}")
from metaflow import Task
task = Task('MyFlow/123/train/101')
# Get all metadata
for meta in task.metadata:
print(f"{meta.name}: {meta.value}")
# Get condensed metadata dict
ds_root = task.metadata_dict.get('ds-root')
print(f"Datastore root: {ds_root}")
Check foreach index
from metaflow import Task
task = Task('MyFlow/123/process/789')
if task.index is not None:
print(f"Task is in a foreach with index: {task.index}")
else:
print("Task is not in a foreach")
- Step - Parent Step object
- DataArtifact - Child DataArtifact objects
- Run - Access run-level information