Skip to content

Sequential

Overview

The Runner executes tasks in a sequential order. It is useful for debugging and testing the workflow. Use this for simple workflows or for testing out functionality before you scale compute it with the QueueRunner.

Basic Usage

The basic usage pattern of the Runner follows three steps:

  1. Initialize the runner
  2. Implement the logic for each stage in your workflow you want to capture with the runner.
  3. Execute the runner

The following example shows how to initialize the runner and implement the logic for each stage in your workflow you want to capture with the runner.

example_agent.py
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks import Runner

# Step 1: Initialization
# Initialize the runner
# project hash is optional but allows you to "fail fast"
# if you misconfigure the stages.
runner = Runner(project_hash="<your_project_hash>")

# Step 2: Definition
# Define agent logic for a specific stage
@runner.stage(stage="my_stage_name")  # or stage="<stage_uuid>"
def process_task(lr: LabelRowV2) -> str | None:
    # Modify the label row as needed
    lr.set_priority(0.5)

    # Return the pathway name or UUID where the task should go next
    return "next_stage"

# Step 3: Execution
if __name__ == "__main__":
    # via the CLI
    runner.run()

    # or via code
    runner(
        project_hash="<your_project_hash">,
        refresh_every=3600,  # seconds
        num_retries = 1,
        task_batch_size = 1,
    )

To execute the runner via the CLI, you can do:

# simple
python example_agent.py --project-hash <your_project_hash>
# use help for additional configurations
python example_agent.py --help

Running Agents

Basic Execution

runner.run()  # will run the runner as CLI tool
runner()      # will run the runner directly

Both options:

  1. Connect to your Encord project
  2. Poll for tasks in the configured stages
  3. Execute your agent functions on each task
  4. Move tasks according to returned pathway
  5. Retry failed tasks up to num_retries times

See the configuration options below.

Command Line Interface

The runner exposes configuration via CLI:

python my_agent.py \
    --project-hash "<project_hash>" \
    --task-batch-size 1 \
    --num-retries 3
    --refresh-every 3600 # seconds

Order of execution

The queue for "stage_1" is emptied first, and successively the queue for "stage_2". If you set the refresh_every argument, the runner polls both queues again after emptying the initial queues. In turn, data that came into the queue after the initial poll by the runner is picked up in the second iteration. In the case where the time of an execution has already exceeded the refresh_every threshold, the agent polls for new tasks instantly.

To give you an idea about the order of execution, please find the pseudo code below.

# ⚠️  PSEUDO CODE - not intended for copying ⚠️
def execute(self, refresh_every = None):
    timestamp = datetime.now()
    while True:
        # self.agents ≈ [stage_1, stage_2]
        for agent in self.agents:  
            for task in agent.get_tasks():
                # Inject params based on task
                stage.execute(solve_dependencies(task, agent))  

        if refresh_every is None:
            break
        else:
            # repeat after timestamp + timedelta(seconds=refresh_every)
            # or straight away if already exceeded
            ...

Error Handling

The runner:

  • Retries failed tasks up to num_retries times (default: 3). Changes to the label row are not rolled back.
  • Logs errors for debugging
  • Continues processing other tasks if a task fails
  • Bundles updates for better performance (configurable via task_batch_size)

Configuration

Initialization

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Parameters:

  • project_hash (str | None, default: None ) –

    The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

Source code in encord_agents/tasks/runner.py
def __init__(self, project_hash: str | None = None):
    """
    Initialize the runner with an optional project hash.

    The `project_hash` will allow stricter stage validation.
    If left unspecified, errors will first be raised during execution of the runner.

    Args:
        project_hash: The project hash that the runner applies to.

            Can be left unspecified to be able to reuse same runner on multiple projects.
    """
    super().__init__(project_hash)
    self.agents: list[RunnerAgent] = []
    self.was_called_from_cli = False

Runtime Configuration

There are two ways to execute the runner. You can run the runner directly from your code:

...
runner = Runner()
...
runner(project_hash="<your_project_hash>")  # See all params below 👇

Or you can run it via the command-line interface (CLI) by employing the runner.run() function. Suppose you have an example.py file that looks like this:

example.py
...
runner = Runner()
...
if __name__ == "__main__":
    runner.run()

Then, the runner functions as a CLI tool, accepting the same arguments as when executed in code.

$ python example.py --help

 Usage: example.py [OPTIONS]

 Execute the runner.
 Full documentation here: https://agents-docs.encord.com/task_agents/runner

╭─ Options ──────────────────────────────────────────────────────────╮
 --refresh-every   INTEGER  Fetch task statuses from the Encord                                 Project every `refresh_every` seconds.                              If `None`, the runner will exit once                                task queue is empty.                                                [default: None]                          --num-retries     INTEGER  If an agent fails on a task, how many                               times should the runner retry it?                                   [default: 3]                             --task-batch-size INTEGER  Number of tasks for which labels are                                loaded into memory at once.                                         [default: 300]                           --project-hash    TEXT     The project hash if not defined at                                  runner instantiation.                                               [default: None]                          --help                     Show this message and exit.             ╰────────────────────────────────────────────────────────────────────╯

Performance Considerations

By default, the Runner bundles task updates for better performance with a batch size of 300. For debugging or when immediate updates are needed, you can set task_batch_size=1:

# Via CLI
python my_agent.py --task-batch-size 1

Or in code

runner(task_batch_size=1)

Scaling with the QueueRunner

The QueueRunner is a more advanced runner that will allow you to process multiple tasks in parallel. It is useful when you have a lot of tasks to process and you want to speed up the processing time via parallel execution.

Both the Runner and the QueueRunner share the same interface. The difference lies in how you execute them.

The Runner executes tasks in a sequential order with the run() function. The QueueRunner translates your implementations into functions that take in a task specification as a JSON string and returns a encord_agents.tasks.models.TaskCompletionResult as a JSON string. Stringified JSON tasks are used to pass messages over queues that typically don't allow for custom object types.

Here's an example of how the difference manifests:

runner = Runner()
@runner.stage("my_stage")
def my_agent(task: AgentTask, label_row: LabelRowV2):
    ...
runner()
queue_runner = QueueRunner()  # Change the runner to the queue runner

# The implementation stays the same
@queue_runner.stage("my_stage")
def my_agent(task: AgentTask, label_row: LabelRowV2):
    ...

# Change the execution to use the queue runner
for agent in queue_runner.get_agent_stages():
    your_task_queue = []
    for task in agent.get_tasks():
        your_task_queue.append(task)

    for task in your_queue:
        result = my_agent(task)

Please refer to the Celery example or Modal example for more information.

Comparison with Queue Runner

The key differences between QueueRunner and the sequential Runner are:

Feature Runner QueueRunner
Execution Model Executes tasks sequentially in a single process Designed for distributed execution across multiple processes
Project Hash Optional at initialization Required at initialization
Function Wrapping Executes your function directly with injected dependencies Additionally wraps your function to handle JSON task specifications
Execution Control Handles task polling and execution You control task distribution and execution through your queue system
Scaling Not suitable for scaling Suitable for scaling