Skip to content

Task agents reference

Task agents

encord_agents.tasks.QueueRunner

Bases: RunnerBase

This class is intended to hold agent implementations. It makes it easy to put agent task specifications into a queue and then execute them in a distributed fashion.

Below is a template for how that would work.

Example:

runner = QueueRunner(project_hash="...")

@runner.stage("Agent 1")
def my_agent_implementation() -> str:
    # ... do your thing
    return "<pathway_name>"

# Populate the queue
my_queue = ...
for stage in runner.get_agent_stages():
    for task in stage.get_tasks():
        my_queue.append(task.model_dump_json())

# Execute on the queue
while my_queue:
    task_spec = my_queue.pop()
    result_json = my_agent_implementation(task_spec)
    result = TaskCompletionResult.model_validate_json(result_json)

Source code in encord_agents/tasks/runner.py
class QueueRunner(RunnerBase):
    """
    This class is intended to hold agent implementations.
    It makes it easy to put agent task specifications into
    a queue and then execute them in a distributed fashion.

    Below is a template for how that would work.

    *Example:*
    ```python
    runner = QueueRunner(project_hash="...")

    @runner.stage("Agent 1")
    def my_agent_implementation() -> str:
        # ... do your thing
        return "<pathway_name>"

    # Populate the queue
    my_queue = ...
    for stage in runner.get_agent_stages():
        for task in stage.get_tasks():
            my_queue.append(task.model_dump_json())

    # Execute on the queue
    while my_queue:
        task_spec = my_queue.pop()
        result_json = my_agent_implementation(task_spec)
        result = TaskCompletionResult.model_validate_json(result_json)
    ```
    """

    def __init__(self, project_hash: str | UUID):
        super().__init__(project_hash)
        assert self.project is not None
        self._project: Project = self.project

    def __call__(self, *args: Any, **kwds: Any) -> Any:
        raise NotImplementedError(
            "Calling the QueueRunner is not intended. "
            "Prefer using wrapped functions with, e.g., modal or Celery. "
            "For more documentation, please see the `QueueRunner.stage` documentation below."
        )

    def stage(
        self,
        stage: str | UUID,
        *,
        label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
        label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
    ) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]:
        """
        Agent wrapper intended for queueing systems and distributed workloads.

        Define your agent as you are used to with dependencies in the method declaration and
        return the pathway from the project workflow that the task should follow upon completion.
        The function will be wrapped in logic that does the following (in pseudo code):

        ```
        @runner.stage("stage_name")
        def my_function(...)
            ...

        # is equivalent to

        def wrapped_function(task_json_spec: str) -> str (result_json):
            task = fetch_task(task_sped)
            resources = load_resources(task)
            pathway = your_function(resources)  # <- this is where your code goes
            task.proceed(pathway)
            return TaskCompletionResult.model_dump_json()
        ```

        When you have an `encord.workflow.stages.agent.AgentTask` instance at hand, let's call
        it `task`, then you can call your `wrapped_function` with `task.model_dump_json()`.
        Similarly, you can put `task.model_dump_json()` int a queue and read from that queue, e.g.,
        from another instance/process, to execute `wrapped_function` there.

        As the pseudo code indicates, `wrapped_function` understands how to take that string from
        the queue and resolve all your defined dependencies before calling `your_function`.
        """
        stage_uuid, printable_name = self.validate_stage(stage)

        def decorator(func: Callable[..., str | UUID | None]) -> Callable[[str], str]:
            runner_agent = self._add_stage_agent(
                stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
            )
            include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
            init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()

            @wraps(func)
            def wrapper(json_str: str) -> str:
                conf = AgentTaskConfig.model_validate_json(json_str)
                try:
                    stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
                except ValueError as e:
                    return TaskCompletionResult(
                        task_uuid=conf.task_uuid,
                        success=False,
                        error=str(e),
                    ).model_dump_json()

                task = next((s for s in stage.get_tasks(data_hash=conf.data_hash)), None)
                if task is None:
                    # TODO logging?
                    return TaskCompletionResult(
                        task_uuid=conf.task_uuid,
                        stage_uuid=stage.uuid,
                        success=False,
                        error="Failed to obtain task from Encord",
                    ).model_dump_json()

                label_row: LabelRowV2 | None = None
                try:
                    if runner_agent.dependant.needs_label_row:
                        label_row = self._project.list_label_rows_v2(
                            data_hashes=[task.data_hash], **include_args.model_dump()
                        )[0]
                        label_row.initialise_labels(**init_args.model_dump())

                    next_stage: TaskAgentReturn = None
                    with ExitStack() as stack:
                        context = Context(project=self._project, task=task, label_row=label_row)
                        dependencies = solve_dependencies(
                            context=context, dependant=runner_agent.dependant, stack=stack
                        )
                        next_stage = runner_agent.callable(**dependencies.values)

                    if next_stage is None:
                        # TODO: Should we log that task didn't continue?
                        pass
                    elif isinstance(next_stage, UUID):
                        task.proceed(pathway_uuid=str(next_stage))
                    else:
                        try:
                            next_stage = UUID(next_stage)
                            task.proceed(pathway_uuid=str(next_stage))
                        except ValueError:
                            task.proceed(pathway_name=str(next_stage))
                    return TaskCompletionResult(
                        task_uuid=task.uuid, stage_uuid=stage.uuid, success=True, pathway=next_stage
                    ).model_dump_json()
                except Exception:
                    # TODO logging?
                    return TaskCompletionResult(
                        task_uuid=task.uuid, stage_uuid=stage.uuid, success=False, error=traceback.format_exc()
                    ).model_dump_json()

            return wrapper

        return decorator

    def get_agent_stages(self) -> Iterable[AgentStage]:
        """
        Get the agent stages for which there exist an agent implementation.

        This function is intended to make it easy to iterate through all current
        agent tasks and put the task specs into external queueing systems like
        Celery or Modal.

        For a concrete example, please see the doc string for the class it self.

        Note that if you didn't specify an implementation (by decorating your
        function with `@runner.stage`) for a given agent stage, the stage will
        not show up by calling this function.

        Returns:
            An iterable over `encord.workflow.stages.agent.AgentStage` objects
            where the runner contains an agent implementation.

        Raises:
            `AssertionError` if the runner does not have an associated project.
        """
        for runner_agent in self.agents:
            is_uuid = False
            try:
                UUID(str(runner_agent.identity))
                is_uuid = True
            except ValueError:
                pass

            if is_uuid:
                stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
            else:
                stage = self._project.workflow.get_stage(name=str(runner_agent.identity), type_=AgentStage)
            yield stage

get_agent_stages

get_agent_stages() -> Iterable[AgentStage]

Get the agent stages for which there exist an agent implementation.

This function is intended to make it easy to iterate through all current agent tasks and put the task specs into external queueing systems like Celery or Modal.

For a concrete example, please see the doc string for the class it self.

Note that if you didn't specify an implementation (by decorating your function with @runner.stage) for a given agent stage, the stage will not show up by calling this function.

Returns:

  • Iterable[AgentStage] –

    An iterable over encord.workflow.stages.agent.AgentStage objects

  • Iterable[AgentStage] –

    where the runner contains an agent implementation.

Source code in encord_agents/tasks/runner.py
def get_agent_stages(self) -> Iterable[AgentStage]:
    """
    Get the agent stages for which there exist an agent implementation.

    This function is intended to make it easy to iterate through all current
    agent tasks and put the task specs into external queueing systems like
    Celery or Modal.

    For a concrete example, please see the doc string for the class it self.

    Note that if you didn't specify an implementation (by decorating your
    function with `@runner.stage`) for a given agent stage, the stage will
    not show up by calling this function.

    Returns:
        An iterable over `encord.workflow.stages.agent.AgentStage` objects
        where the runner contains an agent implementation.

    Raises:
        `AssertionError` if the runner does not have an associated project.
    """
    for runner_agent in self.agents:
        is_uuid = False
        try:
            UUID(str(runner_agent.identity))
            is_uuid = True
        except ValueError:
            pass

        if is_uuid:
            stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
        else:
            stage = self._project.workflow.get_stage(name=str(runner_agent.identity), type_=AgentStage)
        yield stage

stage

stage(stage: str | UUID, *, label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None, label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]

Agent wrapper intended for queueing systems and distributed workloads.

Define your agent as you are used to with dependencies in the method declaration and return the pathway from the project workflow that the task should follow upon completion. The function will be wrapped in logic that does the following (in pseudo code):

@runner.stage("stage_name")
def my_function(...)
    ...

# is equivalent to

def wrapped_function(task_json_spec: str) -> str (result_json):
    task = fetch_task(task_sped)
    resources = load_resources(task)
    pathway = your_function(resources)  # <- this is where your code goes
    task.proceed(pathway)
    return TaskCompletionResult.model_dump_json()

When you have an encord.workflow.stages.agent.AgentTask instance at hand, let's call it task, then you can call your wrapped_function with task.model_dump_json(). Similarly, you can put task.model_dump_json() int a queue and read from that queue, e.g., from another instance/process, to execute wrapped_function there.

As the pseudo code indicates, wrapped_function understands how to take that string from the queue and resolve all your defined dependencies before calling your_function.

Source code in encord_agents/tasks/runner.py
def stage(
    self,
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]:
    """
    Agent wrapper intended for queueing systems and distributed workloads.

    Define your agent as you are used to with dependencies in the method declaration and
    return the pathway from the project workflow that the task should follow upon completion.
    The function will be wrapped in logic that does the following (in pseudo code):

    ```
    @runner.stage("stage_name")
    def my_function(...)
        ...

    # is equivalent to

    def wrapped_function(task_json_spec: str) -> str (result_json):
        task = fetch_task(task_sped)
        resources = load_resources(task)
        pathway = your_function(resources)  # <- this is where your code goes
        task.proceed(pathway)
        return TaskCompletionResult.model_dump_json()
    ```

    When you have an `encord.workflow.stages.agent.AgentTask` instance at hand, let's call
    it `task`, then you can call your `wrapped_function` with `task.model_dump_json()`.
    Similarly, you can put `task.model_dump_json()` int a queue and read from that queue, e.g.,
    from another instance/process, to execute `wrapped_function` there.

    As the pseudo code indicates, `wrapped_function` understands how to take that string from
    the queue and resolve all your defined dependencies before calling `your_function`.
    """
    stage_uuid, printable_name = self.validate_stage(stage)

    def decorator(func: Callable[..., str | UUID | None]) -> Callable[[str], str]:
        runner_agent = self._add_stage_agent(
            stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
        )
        include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
        init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()

        @wraps(func)
        def wrapper(json_str: str) -> str:
            conf = AgentTaskConfig.model_validate_json(json_str)
            try:
                stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
            except ValueError as e:
                return TaskCompletionResult(
                    task_uuid=conf.task_uuid,
                    success=False,
                    error=str(e),
                ).model_dump_json()

            task = next((s for s in stage.get_tasks(data_hash=conf.data_hash)), None)
            if task is None:
                # TODO logging?
                return TaskCompletionResult(
                    task_uuid=conf.task_uuid,
                    stage_uuid=stage.uuid,
                    success=False,
                    error="Failed to obtain task from Encord",
                ).model_dump_json()

            label_row: LabelRowV2 | None = None
            try:
                if runner_agent.dependant.needs_label_row:
                    label_row = self._project.list_label_rows_v2(
                        data_hashes=[task.data_hash], **include_args.model_dump()
                    )[0]
                    label_row.initialise_labels(**init_args.model_dump())

                next_stage: TaskAgentReturn = None
                with ExitStack() as stack:
                    context = Context(project=self._project, task=task, label_row=label_row)
                    dependencies = solve_dependencies(
                        context=context, dependant=runner_agent.dependant, stack=stack
                    )
                    next_stage = runner_agent.callable(**dependencies.values)

                if next_stage is None:
                    # TODO: Should we log that task didn't continue?
                    pass
                elif isinstance(next_stage, UUID):
                    task.proceed(pathway_uuid=str(next_stage))
                else:
                    try:
                        next_stage = UUID(next_stage)
                        task.proceed(pathway_uuid=str(next_stage))
                    except ValueError:
                        task.proceed(pathway_name=str(next_stage))
                return TaskCompletionResult(
                    task_uuid=task.uuid, stage_uuid=stage.uuid, success=True, pathway=next_stage
                ).model_dump_json()
            except Exception:
                # TODO logging?
                return TaskCompletionResult(
                    task_uuid=task.uuid, stage_uuid=stage.uuid, success=False, error=traceback.format_exc()
                ).model_dump_json()

        return wrapper

    return decorator

encord_agents.tasks.Runner

Bases: RunnerBase

Runs agents against Workflow projects.

When called, it will iteratively run agent stages till they are empty. By default, runner will exit after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the refresh_every keyword.

Example:

example_agent.py
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()

@runner.stage("<workflow_node_name>")
# or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
    ...
    return "pathway name"  # or pathway uuid


runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
# or
if __name__ == "__main__":
    # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
    runner.run()
Source code in encord_agents/tasks/runner.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class Runner(RunnerBase):
    """
    Runs agents against Workflow projects.

    When called, it will iteratively run agent stages till they are empty.
    By default, runner will exit after finishing the tasks identified at the point of trigger.
    To automatically re-run, you can use the `refresh_every` keyword.

    **Example:**

    ```python title="example_agent.py"
    from uuid import UUID
    from encord_agents.tasks import Runner
    runner = Runner()

    @runner.stage("<workflow_node_name>")
    # or
    @runner.stage("<workflow_node_uuid>")
    def my_agent(task: AgentTask) -> str | UUID | None:
        ...
        return "pathway name"  # or pathway uuid


    runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
    # or
    if __name__ == "__main__":
        # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
        runner.run()
    ```

    """

    def __init__(self, project_hash: str | None = None):
        """
        Initialize the runner with an optional project hash.

        The `project_hash` will allow stricter stage validation.
        If left unspecified, errors will first be raised during execution of the runner.

        Args:
            project_hash: The project hash that the runner applies to.

                Can be left unspecified to be able to reuse same runner on multiple projects.
        """
        super().__init__(project_hash)
        self.agents: list[RunnerAgent] = []
        self.was_called_from_cli = False

    def stage(
        self,
        stage: str | UUID,
        *,
        label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
        label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
    ) -> Callable[[DecoratedCallable], DecoratedCallable]:
        r"""
        Decorator to associate a function with an agent stage.

        A function decorated with a stage is added to the list of stages
        that will be handled by the runner.
        The runner will call the function for every task which is in that
        stage.


        **Example:**

        ```python
        runner = Runner()

        @runner.stage("<stage_name_or_uuid>")
        def my_func() -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        The function declaration can be any function that takes parameters
        that are type annotated with the following types:

        * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
            that the runner is operating on.
        * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
            that the task is associated with.
        * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
            that the task is associated with.
        * Any other type: which is annotated with a [dependency](/dependencies.md)

        All those parameters will be automatically injected when the agent is called.

        **Example:**

        ```python
        from typing import Iterator
        from typing_extensions import Annotated

        from encord.project import Project
        from encord_agents.tasks import Depends
        from encord_agents.tasks.dependencies import dep_video_iterator
        from encord.workflow.stages.agent import AgentTask

        runner = Runner()

        def random_value() -> float:
            import random
            return random.random()

        @runner.stage("<stage_name_or_uuid>")
        def my_func(
            project: Project,
            lr: LabelRowV2,
            task: AgentTask,
            video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
            custom: Annotated[float, Depends(random_value)]
        ) -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
        [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
        [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

        Args:
            stage: The name or uuid of the stage that the function should be
                associated with.
            label_row_metadata_include_args: Arguments to be passed to
                `project.list_label_rows_v2(...)`
            label_row_initialise_labels_args: Arguments to be passed to
                `label_row.initialise_labels(...)`

        Returns:
            The decorated function.
        """
        stage_uuid, printable_name = self.validate_stage(stage)

        def decorator(func: DecoratedCallable) -> DecoratedCallable:
            self._add_stage_agent(
                stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
            )
            return func

        return decorator

    @staticmethod
    def _execute_tasks(
        project: Project,
        tasks: Iterable[tuple[AgentTask, LabelRowV2 | None]],
        runner_agent: RunnerAgent,
        # num_threads: int,
        num_retries: int,
        pbar_update: Callable[[float | None], bool | None] | None = None,
    ) -> None:
        with Bundle() as bundle:
            for task, label_row in tasks:
                with ExitStack() as stack:
                    context = Context(project=project, task=task, label_row=label_row)
                    dependencies = solve_dependencies(context=context, dependant=runner_agent.dependant, stack=stack)
                    for attempt in range(num_retries + 1):
                        try:
                            next_stage = runner_agent.callable(**dependencies.values)
                            if next_stage is None:
                                pass
                            elif isinstance(next_stage, UUID):
                                task.proceed(pathway_uuid=str(next_stage), bundle=bundle)
                            else:
                                try:
                                    _next_stage = UUID(next_stage)
                                    task.proceed(pathway_uuid=str(_next_stage), bundle=bundle)
                                except ValueError:
                                    task.proceed(pathway_name=next_stage, bundle=bundle)

                            if pbar_update is not None:
                                pbar_update(1.0)
                            break

                        except KeyboardInterrupt:
                            raise
                        except Exception:
                            print(f"[attempt {attempt+1}/{num_retries+1}] Agent failed with error: ")
                            traceback.print_exc()

    def __call__(
        self,
        refresh_every: Annotated[
            Optional[int],
            Option(
                help="Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
            ),
        ] = None,
        num_retries: Annotated[
            int, Option(help="If an agent fails on a task, how many times should the runner retry it?")
        ] = 3,
        task_batch_size: Annotated[
            int, Option(help="Number of tasks for which labels are loaded into memory at once.")
        ] = 300,
        project_hash: Annotated[
            Optional[str], Option(help="The project hash if not defined at runner instantiation.")
        ] = None,
    ) -> None:
        """
        Run your task agent `runner(...)`.

        ???+ info "Self-updating/Polling runner"
            The runner can continuously poll new tasks in the project and execute the defined stage agents.
            To do so, please set the `refresh_every` parameter.
            When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

        Args:
            refresh_every: Fetch task statuses from the Encord Project every `refresh_every` seconds.
                If `None`, the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: The project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        elif self.project is not None:
            project = self.project
        else:
            # Should not happen. Validated above but mypy doesn't understand.
            raise ValueError("Have no project to execute the runner on. Please specify it.")

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
                    init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(
                                        data_hashes=[t.data_hash for t in batch], **include_args.model_dump()
                                    )
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())

                            self._execute_tasks(
                                project,
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar_update=pbar.update,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(
                                    data_hashes=[t.data_hash for t in batch],
                                    **include_args.model_dump(),
                                )
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())
                        self._execute_tasks(
                            project, zip(batch, batch_lrs), runner_agent, num_retries, pbar_update=pbar.update
                        )
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise

    def run(self) -> None:
        """
        Execute the runner.

        This function is intended to be called from the "main file".
        It is an entry point to be able to run the agent(s) via your shell
        with command line arguments.

        **Example:**

        ```python title="example.py"
        runner = Runner(project_hash="<your_project_hash>")

        @runner.stage(stage="...")
        def your_func() -> str:
            ...

        if __name__ == "__main__":
            runner.run()
        ```

        You can then run execute the runner with:

        ```shell
        python example.py --help
        ```

        to see the options is has (it's those from `Runner.__call__`).

        """
        from typer import Typer

        self.was_called_from_cli = True
        app = Typer(add_completion=False, rich_markup_mode="rich")
        app.command(
            help=f"Execute the runner.{os.linesep * 2}Full documentation here: https://agents-docs.encord.com/task_agents/runner",
            short_help="Execute the runner as a CLI.",
        )(self.__call__)
        app()

__call__

__call__(refresh_every: Annotated[Optional[int], Option(help='Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty.')] = None, num_retries: Annotated[int, Option(help='If an agent fails on a task, how many times should the runner retry it?')] = 3, task_batch_size: Annotated[int, Option(help='Number of tasks for which labels are loaded into memory at once.')] = 300, project_hash: Annotated[Optional[str], Option(help='The project hash if not defined at runner instantiation.')] = None) -> None

Run your task agent runner(...).

Self-updating/Polling runner

The runner can continuously poll new tasks in the project and execute the defined stage agents. To do so, please set the refresh_every parameter. When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

Parameters:

  • refresh_every (Annotated[Optional[int], Option(help='Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty.')], default: None ) –

    Fetch task statuses from the Encord Project every refresh_every seconds. If None, the runner will exit once task queue is empty.

  • num_retries (Annotated[int, Option(help='If an agent fails on a task, how many times should the runner retry it?')], default: 3 ) –

    If an agent fails on a task, how many times should the runner retry it?

  • task_batch_size (Annotated[int, Option(help='Number of tasks for which labels are loaded into memory at once.')], default: 300 ) –

    Number of tasks for which labels are loaded into memory at once.

  • project_hash (Annotated[Optional[str], Option(help='The project hash if not defined at runner instantiation.')], default: None ) –

    The project hash if not defined at runner instantiation.

Returns: None

Source code in encord_agents/tasks/runner.py
    def __call__(
        self,
        refresh_every: Annotated[
            Optional[int],
            Option(
                help="Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
            ),
        ] = None,
        num_retries: Annotated[
            int, Option(help="If an agent fails on a task, how many times should the runner retry it?")
        ] = 3,
        task_batch_size: Annotated[
            int, Option(help="Number of tasks for which labels are loaded into memory at once.")
        ] = 300,
        project_hash: Annotated[
            Optional[str], Option(help="The project hash if not defined at runner instantiation.")
        ] = None,
    ) -> None:
        """
        Run your task agent `runner(...)`.

        ???+ info "Self-updating/Polling runner"
            The runner can continuously poll new tasks in the project and execute the defined stage agents.
            To do so, please set the `refresh_every` parameter.
            When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

        Args:
            refresh_every: Fetch task statuses from the Encord Project every `refresh_every` seconds.
                If `None`, the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: The project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        elif self.project is not None:
            project = self.project
        else:
            # Should not happen. Validated above but mypy doesn't understand.
            raise ValueError("Have no project to execute the runner on. Please specify it.")

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
                    init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(
                                        data_hashes=[t.data_hash for t in batch], **include_args.model_dump()
                                    )
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())

                            self._execute_tasks(
                                project,
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar_update=pbar.update,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(
                                    data_hashes=[t.data_hash for t in batch],
                                    **include_args.model_dump(),
                                )
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())
                        self._execute_tasks(
                            project, zip(batch, batch_lrs), runner_agent, num_retries, pbar_update=pbar.update
                        )
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise

__init__

__init__(project_hash: str | None = None)

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Parameters:

  • project_hash (str | None, default: None ) –

    The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

Source code in encord_agents/tasks/runner.py
def __init__(self, project_hash: str | None = None):
    """
    Initialize the runner with an optional project hash.

    The `project_hash` will allow stricter stage validation.
    If left unspecified, errors will first be raised during execution of the runner.

    Args:
        project_hash: The project hash that the runner applies to.

            Can be left unspecified to be able to reuse same runner on multiple projects.
    """
    super().__init__(project_hash)
    self.agents: list[RunnerAgent] = []
    self.was_called_from_cli = False

run

run() -> None

Execute the runner.

This function is intended to be called from the "main file". It is an entry point to be able to run the agent(s) via your shell with command line arguments.

Example:

example.py
runner = Runner(project_hash="<your_project_hash>")

@runner.stage(stage="...")
def your_func() -> str:
    ...

if __name__ == "__main__":
    runner.run()

You can then run execute the runner with:

python example.py --help

to see the options is has (it's those from Runner.__call__).

Source code in encord_agents/tasks/runner.py
def run(self) -> None:
    """
    Execute the runner.

    This function is intended to be called from the "main file".
    It is an entry point to be able to run the agent(s) via your shell
    with command line arguments.

    **Example:**

    ```python title="example.py"
    runner = Runner(project_hash="<your_project_hash>")

    @runner.stage(stage="...")
    def your_func() -> str:
        ...

    if __name__ == "__main__":
        runner.run()
    ```

    You can then run execute the runner with:

    ```shell
    python example.py --help
    ```

    to see the options is has (it's those from `Runner.__call__`).

    """
    from typer import Typer

    self.was_called_from_cli = True
    app = Typer(add_completion=False, rich_markup_mode="rich")
    app.command(
        help=f"Execute the runner.{os.linesep * 2}Full documentation here: https://agents-docs.encord.com/task_agents/runner",
        short_help="Execute the runner as a CLI.",
    )(self.__call__)
    app()

stage

stage(stage: str | UUID, *, label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None, label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None) -> Callable[[DecoratedCallable], DecoratedCallable]

Decorator to associate a function with an agent stage.

A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.

Example:

runner = Runner()

@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

The function declaration can be any function that takes parameters that are type annotated with the following types:

  • Project: the encord.project.Project that the runner is operating on.
  • LabelRowV2: the encord.objects.LabelRowV2 that the task is associated with.
  • AgentTask: the encord.workflow.stages.agent.AgentTask that the task is associated with.
  • Any other type: which is annotated with a dependency

All those parameters will be automatically injected when the agent is called.

Example:

from typing import Iterator
from typing_extensions import Annotated

from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask

runner = Runner()

def random_value() -> float:
    import random
    return random.random()

@runner.stage("<stage_name_or_uuid>")
def my_func(
    project: Project,
    lr: LabelRowV2,
    task: AgentTask,
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
    custom: Annotated[float, Depends(random_value)]
) -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

Parameters:

  • stage (str | UUID) –

    The name or uuid of the stage that the function should be associated with.

  • label_row_metadata_include_args (LabelRowMetadataIncludeArgs | None, default: None ) –

    Arguments to be passed to project.list_label_rows_v2(...)

  • label_row_initialise_labels_args (LabelRowInitialiseLabelsArgs | None, default: None ) –

    Arguments to be passed to label_row.initialise_labels(...)

Returns:

  • Callable[[DecoratedCallable], DecoratedCallable] –

    The decorated function.

Source code in encord_agents/tasks/runner.py
def stage(
    self,
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
) -> Callable[[DecoratedCallable], DecoratedCallable]:
    r"""
    Decorator to associate a function with an agent stage.

    A function decorated with a stage is added to the list of stages
    that will be handled by the runner.
    The runner will call the function for every task which is in that
    stage.


    **Example:**

    ```python
    runner = Runner()

    @runner.stage("<stage_name_or_uuid>")
    def my_func() -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    The function declaration can be any function that takes parameters
    that are type annotated with the following types:

    * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
        that the runner is operating on.
    * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
        that the task is associated with.
    * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
        that the task is associated with.
    * Any other type: which is annotated with a [dependency](/dependencies.md)

    All those parameters will be automatically injected when the agent is called.

    **Example:**

    ```python
    from typing import Iterator
    from typing_extensions import Annotated

    from encord.project import Project
    from encord_agents.tasks import Depends
    from encord_agents.tasks.dependencies import dep_video_iterator
    from encord.workflow.stages.agent import AgentTask

    runner = Runner()

    def random_value() -> float:
        import random
        return random.random()

    @runner.stage("<stage_name_or_uuid>")
    def my_func(
        project: Project,
        lr: LabelRowV2,
        task: AgentTask,
        video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
        custom: Annotated[float, Depends(random_value)]
    ) -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
    [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
    [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

    Args:
        stage: The name or uuid of the stage that the function should be
            associated with.
        label_row_metadata_include_args: Arguments to be passed to
            `project.list_label_rows_v2(...)`
        label_row_initialise_labels_args: Arguments to be passed to
            `label_row.initialise_labels(...)`

    Returns:
        The decorated function.
    """
    stage_uuid, printable_name = self.validate_stage(stage)

    def decorator(func: DecoratedCallable) -> DecoratedCallable:
        self._add_stage_agent(
            stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
        )
        return func

    return decorator

encord_agents.tasks.dependencies

Twin dataclass

Dataclass to hold "label twin" information.

Source code in encord_agents/tasks/dependencies.py
@dataclass(frozen=True)
class Twin:
    """
    Dataclass to hold "label twin" information.
    """

    label_row: LabelRowV2
    task: WorkflowTask | None

dep_asset

dep_asset(lr: LabelRowV2) -> Generator[Path, None, None]

Get a local file path to data asset temporarily stored till end of task execution.

This dependency will fetch the underlying data asset based on a signed url. It will temporarily store the data on disk. Once the task is completed, the asset will be removed from disk again.

Example:

from encord_agents.tasks.dependencies import dep_asset
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<stage_name_or_uuid>")
def my_agent(
    asset: Annotated[Path, Depends(dep_asset)],
) -> str | None:
    asset.stat()  # read file stats
    ...

Returns:

  • None –

    The path to the asset.

Source code in encord_agents/tasks/dependencies.py
def dep_asset(lr: LabelRowV2) -> Generator[Path, None, None]:
    """
    Get a local file path to data asset temporarily stored till end of task execution.

    This dependency will fetch the underlying data asset based on a signed url.
    It will temporarily store the data on disk. Once the task is completed, the
    asset will be removed from disk again.

    **Example:**

    ```python
    from encord_agents.tasks.dependencies import dep_asset
    ...
    runner = Runner(project_hash="<project_hash_a>")

    @runner.stage("<stage_name_or_uuid>")
    def my_agent(
        asset: Annotated[Path, Depends(dep_asset)],
    ) -> str | None:
        asset.stat()  # read file stats
        ...
    ```

    Returns:
        The path to the asset.

    Raises:
        `ValueError` if the underlying assets are not videos, images, or audio.
        `EncordException` if data type not supported by SDK yet.
    """
    with download_asset(lr) as asset:
        yield asset

dep_client

dep_client() -> EncordUserClient

Dependency to provide an authenticated user client.

Example:

from encord.user_client import EncordUserClient
from encord_agents.tasks.dependencies import dep_client
...
@runner.stage("<my_stage_name>")
def my_agent(
    client: Annotated[EncordUserClient, Depends(dep_client)]
) -> str:
    # Client will authenticated and ready to use.
    client.get_dataset("")
Source code in encord_agents/tasks/dependencies.py
def dep_client() -> EncordUserClient:
    """
    Dependency to provide an authenticated user client.

    **Example:**

    ```python
    from encord.user_client import EncordUserClient
    from encord_agents.tasks.dependencies import dep_client
    ...
    @runner.stage("<my_stage_name>")
    def my_agent(
        client: Annotated[EncordUserClient, Depends(dep_client)]
    ) -> str:
        # Client will authenticated and ready to use.
        client.get_dataset("")
    ```

    """
    return get_user_client()

dep_data_lookup

dep_data_lookup(lookup: Annotated[DataLookup, Depends(DataLookup.sharable)]) -> DataLookup

Get a lookup to easily retrieve data rows and storage items associated with the given task.

Info

If you're just looking to get the associated storage item to a task, consider using dep_storage_item instead.

The lookup can, e.g., be useful for

  • Updating client metadata
  • Downloading data from signed urls
  • Matching data to other projects

Example:

from encord.orm.dataset import DataRow
from encord.stotage import StorageItem
from encord.workflow.stages.agent import AgentTask

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
) -> str:
    # Data row from the underlying dataset
    data_row: DataRow = lookup.get_data_row(task.data_hash)

    # Storage item from Encord Index
    storage_item: StorageItem = lookup.get_storage_item(task.data_hash)

    # Current metadata
    client_metadata = storage_item.client_metadata

    # Update metadata
    storage_item.update(
        client_metadata={
            "new": "entry",
            **(client_metadata or {})
        }
    )  # metadata. Make sure not to update in place!
    ...

Parameters:

  • lookup (Annotated[DataLookup, Depends(sharable)]) –

    The object that you can use to lookup data rows and storage items. Automatically injected.

Returns:

  • DataLookup –

    The (shared) lookup object.

Source code in encord_agents/tasks/dependencies.py
def dep_data_lookup(lookup: Annotated[DataLookup, Depends(DataLookup.sharable)]) -> DataLookup:
    """
    Get a lookup to easily retrieve data rows and storage items associated with the given task.

    !!! info
        If you're just looking to get the associated storage item to a task, consider using `dep_storage_item` instead.


    The lookup can, e.g., be useful for

    * Updating client metadata
    * Downloading data from signed urls
    * Matching data to other projects

    **Example:**

    ```python
    from encord.orm.dataset import DataRow
    from encord.stotage import StorageItem
    from encord.workflow.stages.agent import AgentTask

    @runner.stage(stage="Agent 1")
    def my_agent(
        task: AgentTask,
        lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
    ) -> str:
        # Data row from the underlying dataset
        data_row: DataRow = lookup.get_data_row(task.data_hash)

        # Storage item from Encord Index
        storage_item: StorageItem = lookup.get_storage_item(task.data_hash)

        # Current metadata
        client_metadata = storage_item.client_metadata

        # Update metadata
        storage_item.update(
            client_metadata={
                "new": "entry",
                **(client_metadata or {})
            }
        )  # metadata. Make sure not to update in place!
        ...
    ```


    Args:
        lookup: The object that you can use to lookup data rows and storage items. Automatically injected.

    Returns:
        The (shared) lookup object.

    """
    import warnings

    warnings.warn(
        "dep_data_lookup is deprecated and will be removed in a future version. "
        "Use dep_storage_item instead for accessing storage items.",
        DeprecationWarning,
        stacklevel=2,
    )
    return lookup

dep_single_frame

dep_single_frame(lr: LabelRowV2) -> NDArray[np.uint8]

Dependency to inject the first frame of the underlying asset.

The downloaded asset will be named lr.data_hash.{suffix}. When the function has finished, the downloaded file will be removed from the file system.

Example:

from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_single_frame
...

@runner.stage("<my_stage_name>")
def my_agent(
    lr: LabelRowV2,  # <- Automatically injected
    frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
) -> str:
    assert frame.ndim == 3, "Will work"

Parameters:

  • lr (LabelRowV2) –

    The label row. Automatically injected (see example above).

Returns:

  • NDArray[uint8] –

    Numpy array of shape [h, w, 3] RGB colors.

Source code in encord_agents/tasks/dependencies.py
def dep_single_frame(lr: LabelRowV2) -> NDArray[np.uint8]:
    """
    Dependency to inject the first frame of the underlying asset.

    The downloaded asset will be named `lr.data_hash.{suffix}`.
    When the function has finished, the downloaded file will be removed from the file system.

    **Example:**

    ```python
    from encord_agents import FrameData
    from encord_agents.tasks.dependencies import dep_single_frame
    ...

    @runner.stage("<my_stage_name>")
    def my_agent(
        lr: LabelRowV2,  # <- Automatically injected
        frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
    ) -> str:
        assert frame.ndim == 3, "Will work"
    ```

    Args:
        lr: The label row. Automatically injected (see example above).

    Returns:
        Numpy array of shape [h, w, 3] RGB colors.

    """
    with download_asset(lr, frame=0) as asset:
        img = cv2.cvtColor(cv2.imread(asset.as_posix()), cv2.COLOR_BGR2RGB)

    return np.asarray(img, dtype=np.uint8)

dep_storage_item

dep_storage_item(user_client: Annotated[EncordUserClient, Depends(dep_client)], label_row: LabelRowV2) -> StorageItem

Get the storage item associated with the underlying agent task.

The StorageItem is useful for multiple things like

  • Updating client metadata
  • Reading file properties like storage location, fps, duration, DICOM tags, etc.

Example

from encord.storage import StorageItem
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="<my_stage_name>")
def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
    print(storage_item.name)
    print(storage_item.client_metadata)
    ...

Parameters:

  • user_client (Annotated[EncordUserClient, Depends(dep_client)]) –

    The user client. Automatically injected.

  • label_row (LabelRowV2) –

    The label row. Automatically injected.

Returns:

  • StorageItem –

    The storage item.

Source code in encord_agents/tasks/dependencies.py
def dep_storage_item(
    user_client: Annotated[EncordUserClient, Depends(dep_client)], label_row: LabelRowV2
) -> StorageItem:
    r"""
    Get the storage item associated with the underlying agent task.

    The [`StorageItem`](https://docs.encord.com/sdk-documentation/sdk-references/StorageItem){ target="\_blank", rel="noopener noreferrer" }
    is useful for multiple things like

    * Updating client metadata
    * Reading file properties like storage location, fps, duration, DICOM tags, etc.

    **Example**

    ```python
    from encord.storage import StorageItem
    from encord_agents.tasks.dependencies import dep_storage_item

    @runner.stage(stage="<my_stage_name>")
    def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
        print(storage_item.name)
        print(storage_item.client_metadata)
        ...
    ```

    Args:
        user_client: The user client. Automatically injected.
        label_row: The label row. Automatically injected.

    Returns:
        The storage item.
    """
    if label_row.backing_item_uuid is None:
        raise ValueError("Label row does not have a backing item UUID")
    return user_client.get_storage_item(label_row.backing_item_uuid)

dep_twin_label_row

dep_twin_label_row(twin_project_hash: str, init_labels: bool = True, include_task: bool = False) -> Callable[[LabelRowV2], Twin | None]

Dependency to link assets between two Projects. When your Runner in running on <project_hash_a>, you can use this to get a Twin of labels and the underlying task in the "twin project" with <project_hash_b>.

This is useful in situations like:

  • When you want to transfer labels from a source project" to a sink project.
  • If you want to compare labels to labels from other projects upon label submission.
  • If you want to extend an existing project with labels from another project on the same underlying data.

Example:

from encord.workflow.common import WorkflowTask
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<my_stage_name_in_project_a>")
def my_agent(
    project_a_label_row: LabelRowV2,
    twin: Annotated[
        Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
    ],
) -> str | None:
    label_row_from_project_b: LabelRowV2 = twin.label_row
    task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)

Parameters:

  • twin_project_hash (str) –

    The project has of the twin project (attached to the same datasets) from which you want to load the additional data.

  • init_labels (bool, default: True ) –

    If true, the label row will be initialized before calling the agent.

  • include_task (bool, default: False ) –

    If true, the task field of the Twin will be populated. If population fails, e.g., for non-workflow projects, the task will also be None.

Returns:

  • Callable[[LabelRowV2], Twin | None] –

    The twin.

Source code in encord_agents/tasks/dependencies.py
def dep_twin_label_row(
    twin_project_hash: str, init_labels: bool = True, include_task: bool = False
) -> Callable[[LabelRowV2], Twin | None]:
    """
    Dependency to link assets between two Projects. When your `Runner` in running on
    `<project_hash_a>`, you can use this to get a `Twin` of labels and the underlying
    task in the "twin project" with `<project_hash_b>`.

    This is useful in situations like:

    * When you want to transfer labels from a source project" to a sink project.
    * If you want to compare labels to labels from other projects upon label submission.
    * If you want to extend an existing project with labels from another project on the same underlying data.

    **Example:**

    ```python
    from encord.workflow.common import WorkflowTask
    from encord.objects.ontology_labels_impl import LabelRowV2
    from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
    ...
    runner = Runner(project_hash="<project_hash_a>")

    @runner.stage("<my_stage_name_in_project_a>")
    def my_agent(
        project_a_label_row: LabelRowV2,
        twin: Annotated[
            Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
        ],
    ) -> str | None:
        label_row_from_project_b: LabelRowV2 = twin.label_row
        task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)
    ```

    Args:
        twin_project_hash: The project has of the twin project (attached to the same datasets)
            from which you want to load the additional data.
        init_labels: If true, the label row will be initialized before calling the agent.
        include_task: If true, the `task` field of the `Twin` will be populated. If population
            fails, e.g., for non-workflow projects, the task will also be None.

    Returns:
        The twin.

    Raises:
        `encord.AuthorizationError` if you do not have access to the project.
    """
    client = get_user_client()
    try:
        twin_project = client.get_project(twin_project_hash)
    except (AuthorisationError, AuthenticationError):
        raise PrintableError(
            f"You do not seem to have access to the project with project hash `[blue]{twin_project_hash}[/blue]`"
        )
    except UnknownException:
        raise PrintableError(
            f"An unknown error occurred while trying to get the project with project hash `[blue]{twin_project_hash}[/blue]` in the `dep_twin_label_row` dependency."
        )

    label_rows: dict[str, LabelRowV2] = {lr.data_hash: lr for lr in twin_project.list_label_rows_v2()}

    def get_twin_label_row(lr_original: LabelRowV2) -> Twin | None:
        lr_twin = label_rows.get(lr_original.data_hash)
        if lr_twin is None:
            return None

        if init_labels:
            lr_twin.initialise_labels()

        graph_node = lr_twin.workflow_graph_node
        task: WorkflowTask | None = None

        if include_task and graph_node is not None:
            try:
                stage: WorkflowStage = twin_project.workflow.get_stage(uuid=graph_node.uuid)
                for task in stage.get_tasks(data_hash=lr_original.data_hash):
                    pass
            except Exception:
                # TODO: print proper warning.
                pass

        return Twin(label_row=lr_twin, task=task)

    return get_twin_label_row

dep_video_iterator

dep_video_iterator(lr: LabelRowV2) -> Generator[Iterator[Frame], None, None]

Dependency to inject a video frame iterator for doing things over many frames.

Intended use

from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_video_iterator
...

@runner.stage("<my_stage_name>")
def my_agent(
    lr: LabelRowV2,  # <- Automatically injected
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
    for frame in video_frames:
        print(frame.frame, frame.content.shape)

Parameters:

  • lr (LabelRowV2) –

    Automatically injected label row dependency.

Raises:

  • NotImplementedError –

    Will fail for other data types than video.

Yields:

  • Iterator[Frame] –

    An iterator.

Source code in encord_agents/tasks/dependencies.py
def dep_video_iterator(lr: LabelRowV2) -> Generator[Iterator[Frame], None, None]:
    """
    Dependency to inject a video frame iterator for doing things over many frames.

    **Intended use**

    ```python
    from encord_agents import FrameData
    from encord_agents.tasks.dependencies import dep_video_iterator
    ...

    @runner.stage("<my_stage_name>")
    def my_agent(
        lr: LabelRowV2,  # <- Automatically injected
        video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
    ) -> str:
        for frame in video_frames:
            print(frame.frame, frame.content.shape)
    ```

    Args:
        lr: Automatically injected label row dependency.

    Raises:
        NotImplementedError: Will fail for other data types than video.

    Yields:
        An iterator.

    """
    if lr.data_type != DataType.VIDEO:
        raise NotImplementedError("`dep_video_iterator` only supported for video label rows")

    with download_asset(lr, None) as asset:
        yield iter_video(asset)

encord_agents.tasks.models

TaskCompletionResult

Bases: BaseModel

Data model to hold information about the completion result of encord_agents.tasks.QueueRunner agents.

Source code in encord_agents/tasks/models.py
class TaskCompletionResult(BaseModel):
    """
    Data model to hold information about the completion result of
    `encord_agents.tasks.QueueRunner` agents.
    """

    task_uuid: UUID = Field(description="UUID of the task in the Encord Queueing system")
    stage_uuid: UUID | None = Field(
        description="UUID of the workflow stage at which the task was executed. If None, the stage could not be identified from the `task_uuid`.",
        default=None,
    )
    success: bool = Field(description="Agent executed without errors")
    # TODO: When we can read pathway definitions via the `encord` SDK, pathway can be typed None | UUID only.
    # Currently, pathway can also be the name of the pathway.
    pathway: str | UUID | None = Field(
        description="The pathway that the task was passed along to. If None, either the agent succeeded but didn't return a pathway or the agent failed so the task didn't proceed.",
        default=None,
    )
    error: str | None = Field(description="Stack trace or error message if an error occurred", default=None)

encord_agents.tasks.runner

QueueRunner

Bases: RunnerBase

This class is intended to hold agent implementations. It makes it easy to put agent task specifications into a queue and then execute them in a distributed fashion.

Below is a template for how that would work.

Example:

runner = QueueRunner(project_hash="...")

@runner.stage("Agent 1")
def my_agent_implementation() -> str:
    # ... do your thing
    return "<pathway_name>"

# Populate the queue
my_queue = ...
for stage in runner.get_agent_stages():
    for task in stage.get_tasks():
        my_queue.append(task.model_dump_json())

# Execute on the queue
while my_queue:
    task_spec = my_queue.pop()
    result_json = my_agent_implementation(task_spec)
    result = TaskCompletionResult.model_validate_json(result_json)

Source code in encord_agents/tasks/runner.py
class QueueRunner(RunnerBase):
    """
    This class is intended to hold agent implementations.
    It makes it easy to put agent task specifications into
    a queue and then execute them in a distributed fashion.

    Below is a template for how that would work.

    *Example:*
    ```python
    runner = QueueRunner(project_hash="...")

    @runner.stage("Agent 1")
    def my_agent_implementation() -> str:
        # ... do your thing
        return "<pathway_name>"

    # Populate the queue
    my_queue = ...
    for stage in runner.get_agent_stages():
        for task in stage.get_tasks():
            my_queue.append(task.model_dump_json())

    # Execute on the queue
    while my_queue:
        task_spec = my_queue.pop()
        result_json = my_agent_implementation(task_spec)
        result = TaskCompletionResult.model_validate_json(result_json)
    ```
    """

    def __init__(self, project_hash: str | UUID):
        super().__init__(project_hash)
        assert self.project is not None
        self._project: Project = self.project

    def __call__(self, *args: Any, **kwds: Any) -> Any:
        raise NotImplementedError(
            "Calling the QueueRunner is not intended. "
            "Prefer using wrapped functions with, e.g., modal or Celery. "
            "For more documentation, please see the `QueueRunner.stage` documentation below."
        )

    def stage(
        self,
        stage: str | UUID,
        *,
        label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
        label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
    ) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]:
        """
        Agent wrapper intended for queueing systems and distributed workloads.

        Define your agent as you are used to with dependencies in the method declaration and
        return the pathway from the project workflow that the task should follow upon completion.
        The function will be wrapped in logic that does the following (in pseudo code):

        ```
        @runner.stage("stage_name")
        def my_function(...)
            ...

        # is equivalent to

        def wrapped_function(task_json_spec: str) -> str (result_json):
            task = fetch_task(task_sped)
            resources = load_resources(task)
            pathway = your_function(resources)  # <- this is where your code goes
            task.proceed(pathway)
            return TaskCompletionResult.model_dump_json()
        ```

        When you have an `encord.workflow.stages.agent.AgentTask` instance at hand, let's call
        it `task`, then you can call your `wrapped_function` with `task.model_dump_json()`.
        Similarly, you can put `task.model_dump_json()` int a queue and read from that queue, e.g.,
        from another instance/process, to execute `wrapped_function` there.

        As the pseudo code indicates, `wrapped_function` understands how to take that string from
        the queue and resolve all your defined dependencies before calling `your_function`.
        """
        stage_uuid, printable_name = self.validate_stage(stage)

        def decorator(func: Callable[..., str | UUID | None]) -> Callable[[str], str]:
            runner_agent = self._add_stage_agent(
                stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
            )
            include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
            init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()

            @wraps(func)
            def wrapper(json_str: str) -> str:
                conf = AgentTaskConfig.model_validate_json(json_str)
                try:
                    stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
                except ValueError as e:
                    return TaskCompletionResult(
                        task_uuid=conf.task_uuid,
                        success=False,
                        error=str(e),
                    ).model_dump_json()

                task = next((s for s in stage.get_tasks(data_hash=conf.data_hash)), None)
                if task is None:
                    # TODO logging?
                    return TaskCompletionResult(
                        task_uuid=conf.task_uuid,
                        stage_uuid=stage.uuid,
                        success=False,
                        error="Failed to obtain task from Encord",
                    ).model_dump_json()

                label_row: LabelRowV2 | None = None
                try:
                    if runner_agent.dependant.needs_label_row:
                        label_row = self._project.list_label_rows_v2(
                            data_hashes=[task.data_hash], **include_args.model_dump()
                        )[0]
                        label_row.initialise_labels(**init_args.model_dump())

                    next_stage: TaskAgentReturn = None
                    with ExitStack() as stack:
                        context = Context(project=self._project, task=task, label_row=label_row)
                        dependencies = solve_dependencies(
                            context=context, dependant=runner_agent.dependant, stack=stack
                        )
                        next_stage = runner_agent.callable(**dependencies.values)

                    if next_stage is None:
                        # TODO: Should we log that task didn't continue?
                        pass
                    elif isinstance(next_stage, UUID):
                        task.proceed(pathway_uuid=str(next_stage))
                    else:
                        try:
                            next_stage = UUID(next_stage)
                            task.proceed(pathway_uuid=str(next_stage))
                        except ValueError:
                            task.proceed(pathway_name=str(next_stage))
                    return TaskCompletionResult(
                        task_uuid=task.uuid, stage_uuid=stage.uuid, success=True, pathway=next_stage
                    ).model_dump_json()
                except Exception:
                    # TODO logging?
                    return TaskCompletionResult(
                        task_uuid=task.uuid, stage_uuid=stage.uuid, success=False, error=traceback.format_exc()
                    ).model_dump_json()

            return wrapper

        return decorator

    def get_agent_stages(self) -> Iterable[AgentStage]:
        """
        Get the agent stages for which there exist an agent implementation.

        This function is intended to make it easy to iterate through all current
        agent tasks and put the task specs into external queueing systems like
        Celery or Modal.

        For a concrete example, please see the doc string for the class it self.

        Note that if you didn't specify an implementation (by decorating your
        function with `@runner.stage`) for a given agent stage, the stage will
        not show up by calling this function.

        Returns:
            An iterable over `encord.workflow.stages.agent.AgentStage` objects
            where the runner contains an agent implementation.

        Raises:
            `AssertionError` if the runner does not have an associated project.
        """
        for runner_agent in self.agents:
            is_uuid = False
            try:
                UUID(str(runner_agent.identity))
                is_uuid = True
            except ValueError:
                pass

            if is_uuid:
                stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
            else:
                stage = self._project.workflow.get_stage(name=str(runner_agent.identity), type_=AgentStage)
            yield stage
get_agent_stages
get_agent_stages() -> Iterable[AgentStage]

Get the agent stages for which there exist an agent implementation.

This function is intended to make it easy to iterate through all current agent tasks and put the task specs into external queueing systems like Celery or Modal.

For a concrete example, please see the doc string for the class it self.

Note that if you didn't specify an implementation (by decorating your function with @runner.stage) for a given agent stage, the stage will not show up by calling this function.

Returns:

  • Iterable[AgentStage] –

    An iterable over encord.workflow.stages.agent.AgentStage objects

  • Iterable[AgentStage] –

    where the runner contains an agent implementation.

Source code in encord_agents/tasks/runner.py
def get_agent_stages(self) -> Iterable[AgentStage]:
    """
    Get the agent stages for which there exist an agent implementation.

    This function is intended to make it easy to iterate through all current
    agent tasks and put the task specs into external queueing systems like
    Celery or Modal.

    For a concrete example, please see the doc string for the class it self.

    Note that if you didn't specify an implementation (by decorating your
    function with `@runner.stage`) for a given agent stage, the stage will
    not show up by calling this function.

    Returns:
        An iterable over `encord.workflow.stages.agent.AgentStage` objects
        where the runner contains an agent implementation.

    Raises:
        `AssertionError` if the runner does not have an associated project.
    """
    for runner_agent in self.agents:
        is_uuid = False
        try:
            UUID(str(runner_agent.identity))
            is_uuid = True
        except ValueError:
            pass

        if is_uuid:
            stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
        else:
            stage = self._project.workflow.get_stage(name=str(runner_agent.identity), type_=AgentStage)
        yield stage
stage
stage(stage: str | UUID, *, label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None, label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]

Agent wrapper intended for queueing systems and distributed workloads.

Define your agent as you are used to with dependencies in the method declaration and return the pathway from the project workflow that the task should follow upon completion. The function will be wrapped in logic that does the following (in pseudo code):

@runner.stage("stage_name")
def my_function(...)
    ...

# is equivalent to

def wrapped_function(task_json_spec: str) -> str (result_json):
    task = fetch_task(task_sped)
    resources = load_resources(task)
    pathway = your_function(resources)  # <- this is where your code goes
    task.proceed(pathway)
    return TaskCompletionResult.model_dump_json()

When you have an encord.workflow.stages.agent.AgentTask instance at hand, let's call it task, then you can call your wrapped_function with task.model_dump_json(). Similarly, you can put task.model_dump_json() int a queue and read from that queue, e.g., from another instance/process, to execute wrapped_function there.

As the pseudo code indicates, wrapped_function understands how to take that string from the queue and resolve all your defined dependencies before calling your_function.

Source code in encord_agents/tasks/runner.py
def stage(
    self,
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
) -> Callable[[Callable[..., str | UUID | None]], Callable[[str], str]]:
    """
    Agent wrapper intended for queueing systems and distributed workloads.

    Define your agent as you are used to with dependencies in the method declaration and
    return the pathway from the project workflow that the task should follow upon completion.
    The function will be wrapped in logic that does the following (in pseudo code):

    ```
    @runner.stage("stage_name")
    def my_function(...)
        ...

    # is equivalent to

    def wrapped_function(task_json_spec: str) -> str (result_json):
        task = fetch_task(task_sped)
        resources = load_resources(task)
        pathway = your_function(resources)  # <- this is where your code goes
        task.proceed(pathway)
        return TaskCompletionResult.model_dump_json()
    ```

    When you have an `encord.workflow.stages.agent.AgentTask` instance at hand, let's call
    it `task`, then you can call your `wrapped_function` with `task.model_dump_json()`.
    Similarly, you can put `task.model_dump_json()` int a queue and read from that queue, e.g.,
    from another instance/process, to execute `wrapped_function` there.

    As the pseudo code indicates, `wrapped_function` understands how to take that string from
    the queue and resolve all your defined dependencies before calling `your_function`.
    """
    stage_uuid, printable_name = self.validate_stage(stage)

    def decorator(func: Callable[..., str | UUID | None]) -> Callable[[str], str]:
        runner_agent = self._add_stage_agent(
            stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
        )
        include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
        init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()

        @wraps(func)
        def wrapper(json_str: str) -> str:
            conf = AgentTaskConfig.model_validate_json(json_str)
            try:
                stage = self._project.workflow.get_stage(uuid=runner_agent.identity, type_=AgentStage)
            except ValueError as e:
                return TaskCompletionResult(
                    task_uuid=conf.task_uuid,
                    success=False,
                    error=str(e),
                ).model_dump_json()

            task = next((s for s in stage.get_tasks(data_hash=conf.data_hash)), None)
            if task is None:
                # TODO logging?
                return TaskCompletionResult(
                    task_uuid=conf.task_uuid,
                    stage_uuid=stage.uuid,
                    success=False,
                    error="Failed to obtain task from Encord",
                ).model_dump_json()

            label_row: LabelRowV2 | None = None
            try:
                if runner_agent.dependant.needs_label_row:
                    label_row = self._project.list_label_rows_v2(
                        data_hashes=[task.data_hash], **include_args.model_dump()
                    )[0]
                    label_row.initialise_labels(**init_args.model_dump())

                next_stage: TaskAgentReturn = None
                with ExitStack() as stack:
                    context = Context(project=self._project, task=task, label_row=label_row)
                    dependencies = solve_dependencies(
                        context=context, dependant=runner_agent.dependant, stack=stack
                    )
                    next_stage = runner_agent.callable(**dependencies.values)

                if next_stage is None:
                    # TODO: Should we log that task didn't continue?
                    pass
                elif isinstance(next_stage, UUID):
                    task.proceed(pathway_uuid=str(next_stage))
                else:
                    try:
                        next_stage = UUID(next_stage)
                        task.proceed(pathway_uuid=str(next_stage))
                    except ValueError:
                        task.proceed(pathway_name=str(next_stage))
                return TaskCompletionResult(
                    task_uuid=task.uuid, stage_uuid=stage.uuid, success=True, pathway=next_stage
                ).model_dump_json()
            except Exception:
                # TODO logging?
                return TaskCompletionResult(
                    task_uuid=task.uuid, stage_uuid=stage.uuid, success=False, error=traceback.format_exc()
                ).model_dump_json()

        return wrapper

    return decorator

Runner

Bases: RunnerBase

Runs agents against Workflow projects.

When called, it will iteratively run agent stages till they are empty. By default, runner will exit after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the refresh_every keyword.

Example:

example_agent.py
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()

@runner.stage("<workflow_node_name>")
# or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
    ...
    return "pathway name"  # or pathway uuid


runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
# or
if __name__ == "__main__":
    # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
    runner.run()
Source code in encord_agents/tasks/runner.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class Runner(RunnerBase):
    """
    Runs agents against Workflow projects.

    When called, it will iteratively run agent stages till they are empty.
    By default, runner will exit after finishing the tasks identified at the point of trigger.
    To automatically re-run, you can use the `refresh_every` keyword.

    **Example:**

    ```python title="example_agent.py"
    from uuid import UUID
    from encord_agents.tasks import Runner
    runner = Runner()

    @runner.stage("<workflow_node_name>")
    # or
    @runner.stage("<workflow_node_uuid>")
    def my_agent(task: AgentTask) -> str | UUID | None:
        ...
        return "pathway name"  # or pathway uuid


    runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
    # or
    if __name__ == "__main__":
        # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
        runner.run()
    ```

    """

    def __init__(self, project_hash: str | None = None):
        """
        Initialize the runner with an optional project hash.

        The `project_hash` will allow stricter stage validation.
        If left unspecified, errors will first be raised during execution of the runner.

        Args:
            project_hash: The project hash that the runner applies to.

                Can be left unspecified to be able to reuse same runner on multiple projects.
        """
        super().__init__(project_hash)
        self.agents: list[RunnerAgent] = []
        self.was_called_from_cli = False

    def stage(
        self,
        stage: str | UUID,
        *,
        label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
        label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
    ) -> Callable[[DecoratedCallable], DecoratedCallable]:
        r"""
        Decorator to associate a function with an agent stage.

        A function decorated with a stage is added to the list of stages
        that will be handled by the runner.
        The runner will call the function for every task which is in that
        stage.


        **Example:**

        ```python
        runner = Runner()

        @runner.stage("<stage_name_or_uuid>")
        def my_func() -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        The function declaration can be any function that takes parameters
        that are type annotated with the following types:

        * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
            that the runner is operating on.
        * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
            that the task is associated with.
        * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
            that the task is associated with.
        * Any other type: which is annotated with a [dependency](/dependencies.md)

        All those parameters will be automatically injected when the agent is called.

        **Example:**

        ```python
        from typing import Iterator
        from typing_extensions import Annotated

        from encord.project import Project
        from encord_agents.tasks import Depends
        from encord_agents.tasks.dependencies import dep_video_iterator
        from encord.workflow.stages.agent import AgentTask

        runner = Runner()

        def random_value() -> float:
            import random
            return random.random()

        @runner.stage("<stage_name_or_uuid>")
        def my_func(
            project: Project,
            lr: LabelRowV2,
            task: AgentTask,
            video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
            custom: Annotated[float, Depends(random_value)]
        ) -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
        [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
        [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

        Args:
            stage: The name or uuid of the stage that the function should be
                associated with.
            label_row_metadata_include_args: Arguments to be passed to
                `project.list_label_rows_v2(...)`
            label_row_initialise_labels_args: Arguments to be passed to
                `label_row.initialise_labels(...)`

        Returns:
            The decorated function.
        """
        stage_uuid, printable_name = self.validate_stage(stage)

        def decorator(func: DecoratedCallable) -> DecoratedCallable:
            self._add_stage_agent(
                stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
            )
            return func

        return decorator

    @staticmethod
    def _execute_tasks(
        project: Project,
        tasks: Iterable[tuple[AgentTask, LabelRowV2 | None]],
        runner_agent: RunnerAgent,
        # num_threads: int,
        num_retries: int,
        pbar_update: Callable[[float | None], bool | None] | None = None,
    ) -> None:
        with Bundle() as bundle:
            for task, label_row in tasks:
                with ExitStack() as stack:
                    context = Context(project=project, task=task, label_row=label_row)
                    dependencies = solve_dependencies(context=context, dependant=runner_agent.dependant, stack=stack)
                    for attempt in range(num_retries + 1):
                        try:
                            next_stage = runner_agent.callable(**dependencies.values)
                            if next_stage is None:
                                pass
                            elif isinstance(next_stage, UUID):
                                task.proceed(pathway_uuid=str(next_stage), bundle=bundle)
                            else:
                                try:
                                    _next_stage = UUID(next_stage)
                                    task.proceed(pathway_uuid=str(_next_stage), bundle=bundle)
                                except ValueError:
                                    task.proceed(pathway_name=next_stage, bundle=bundle)

                            if pbar_update is not None:
                                pbar_update(1.0)
                            break

                        except KeyboardInterrupt:
                            raise
                        except Exception:
                            print(f"[attempt {attempt+1}/{num_retries+1}] Agent failed with error: ")
                            traceback.print_exc()

    def __call__(
        self,
        refresh_every: Annotated[
            Optional[int],
            Option(
                help="Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
            ),
        ] = None,
        num_retries: Annotated[
            int, Option(help="If an agent fails on a task, how many times should the runner retry it?")
        ] = 3,
        task_batch_size: Annotated[
            int, Option(help="Number of tasks for which labels are loaded into memory at once.")
        ] = 300,
        project_hash: Annotated[
            Optional[str], Option(help="The project hash if not defined at runner instantiation.")
        ] = None,
    ) -> None:
        """
        Run your task agent `runner(...)`.

        ???+ info "Self-updating/Polling runner"
            The runner can continuously poll new tasks in the project and execute the defined stage agents.
            To do so, please set the `refresh_every` parameter.
            When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

        Args:
            refresh_every: Fetch task statuses from the Encord Project every `refresh_every` seconds.
                If `None`, the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: The project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        elif self.project is not None:
            project = self.project
        else:
            # Should not happen. Validated above but mypy doesn't understand.
            raise ValueError("Have no project to execute the runner on. Please specify it.")

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
                    init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(
                                        data_hashes=[t.data_hash for t in batch], **include_args.model_dump()
                                    )
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())

                            self._execute_tasks(
                                project,
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar_update=pbar.update,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(
                                    data_hashes=[t.data_hash for t in batch],
                                    **include_args.model_dump(),
                                )
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())
                        self._execute_tasks(
                            project, zip(batch, batch_lrs), runner_agent, num_retries, pbar_update=pbar.update
                        )
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise

    def run(self) -> None:
        """
        Execute the runner.

        This function is intended to be called from the "main file".
        It is an entry point to be able to run the agent(s) via your shell
        with command line arguments.

        **Example:**

        ```python title="example.py"
        runner = Runner(project_hash="<your_project_hash>")

        @runner.stage(stage="...")
        def your_func() -> str:
            ...

        if __name__ == "__main__":
            runner.run()
        ```

        You can then run execute the runner with:

        ```shell
        python example.py --help
        ```

        to see the options is has (it's those from `Runner.__call__`).

        """
        from typer import Typer

        self.was_called_from_cli = True
        app = Typer(add_completion=False, rich_markup_mode="rich")
        app.command(
            help=f"Execute the runner.{os.linesep * 2}Full documentation here: https://agents-docs.encord.com/task_agents/runner",
            short_help="Execute the runner as a CLI.",
        )(self.__call__)
        app()
__call__
__call__(refresh_every: Annotated[Optional[int], Option(help='Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty.')] = None, num_retries: Annotated[int, Option(help='If an agent fails on a task, how many times should the runner retry it?')] = 3, task_batch_size: Annotated[int, Option(help='Number of tasks for which labels are loaded into memory at once.')] = 300, project_hash: Annotated[Optional[str], Option(help='The project hash if not defined at runner instantiation.')] = None) -> None

Run your task agent runner(...).

Self-updating/Polling runner

The runner can continuously poll new tasks in the project and execute the defined stage agents. To do so, please set the refresh_every parameter. When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

Parameters:

  • refresh_every (Annotated[Optional[int], Option(help='Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty.')], default: None ) –

    Fetch task statuses from the Encord Project every refresh_every seconds. If None, the runner will exit once task queue is empty.

  • num_retries (Annotated[int, Option(help='If an agent fails on a task, how many times should the runner retry it?')], default: 3 ) –

    If an agent fails on a task, how many times should the runner retry it?

  • task_batch_size (Annotated[int, Option(help='Number of tasks for which labels are loaded into memory at once.')], default: 300 ) –

    Number of tasks for which labels are loaded into memory at once.

  • project_hash (Annotated[Optional[str], Option(help='The project hash if not defined at runner instantiation.')], default: None ) –

    The project hash if not defined at runner instantiation.

Returns: None

Source code in encord_agents/tasks/runner.py
    def __call__(
        self,
        refresh_every: Annotated[
            Optional[int],
            Option(
                help="Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
            ),
        ] = None,
        num_retries: Annotated[
            int, Option(help="If an agent fails on a task, how many times should the runner retry it?")
        ] = 3,
        task_batch_size: Annotated[
            int, Option(help="Number of tasks for which labels are loaded into memory at once.")
        ] = 300,
        project_hash: Annotated[
            Optional[str], Option(help="The project hash if not defined at runner instantiation.")
        ] = None,
    ) -> None:
        """
        Run your task agent `runner(...)`.

        ???+ info "Self-updating/Polling runner"
            The runner can continuously poll new tasks in the project and execute the defined stage agents.
            To do so, please set the `refresh_every` parameter.
            When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

        Args:
            refresh_every: Fetch task statuses from the Encord Project every `refresh_every` seconds.
                If `None`, the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: The project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        elif self.project is not None:
            project = self.project
        else:
            # Should not happen. Validated above but mypy doesn't understand.
            raise ValueError("Have no project to execute the runner on. Please specify it.")

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    include_args = runner_agent.label_row_metadata_include_args or LabelRowMetadataIncludeArgs()
                    init_args = runner_agent.label_row_initialise_labels_args or LabelRowInitialiseLabelsArgs()
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(
                                        data_hashes=[t.data_hash for t in batch], **include_args.model_dump()
                                    )
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())

                            self._execute_tasks(
                                project,
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar_update=pbar.update,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(
                                    data_hashes=[t.data_hash for t in batch],
                                    **include_args.model_dump(),
                                )
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle, **init_args.model_dump())
                        self._execute_tasks(
                            project, zip(batch, batch_lrs), runner_agent, num_retries, pbar_update=pbar.update
                        )
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise
__init__
__init__(project_hash: str | None = None)

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Parameters:

  • project_hash (str | None, default: None ) –

    The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

Source code in encord_agents/tasks/runner.py
def __init__(self, project_hash: str | None = None):
    """
    Initialize the runner with an optional project hash.

    The `project_hash` will allow stricter stage validation.
    If left unspecified, errors will first be raised during execution of the runner.

    Args:
        project_hash: The project hash that the runner applies to.

            Can be left unspecified to be able to reuse same runner on multiple projects.
    """
    super().__init__(project_hash)
    self.agents: list[RunnerAgent] = []
    self.was_called_from_cli = False
run
run() -> None

Execute the runner.

This function is intended to be called from the "main file". It is an entry point to be able to run the agent(s) via your shell with command line arguments.

Example:

example.py
runner = Runner(project_hash="<your_project_hash>")

@runner.stage(stage="...")
def your_func() -> str:
    ...

if __name__ == "__main__":
    runner.run()

You can then run execute the runner with:

python example.py --help

to see the options is has (it's those from Runner.__call__).

Source code in encord_agents/tasks/runner.py
def run(self) -> None:
    """
    Execute the runner.

    This function is intended to be called from the "main file".
    It is an entry point to be able to run the agent(s) via your shell
    with command line arguments.

    **Example:**

    ```python title="example.py"
    runner = Runner(project_hash="<your_project_hash>")

    @runner.stage(stage="...")
    def your_func() -> str:
        ...

    if __name__ == "__main__":
        runner.run()
    ```

    You can then run execute the runner with:

    ```shell
    python example.py --help
    ```

    to see the options is has (it's those from `Runner.__call__`).

    """
    from typer import Typer

    self.was_called_from_cli = True
    app = Typer(add_completion=False, rich_markup_mode="rich")
    app.command(
        help=f"Execute the runner.{os.linesep * 2}Full documentation here: https://agents-docs.encord.com/task_agents/runner",
        short_help="Execute the runner as a CLI.",
    )(self.__call__)
    app()
stage
stage(stage: str | UUID, *, label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None, label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None) -> Callable[[DecoratedCallable], DecoratedCallable]

Decorator to associate a function with an agent stage.

A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.

Example:

runner = Runner()

@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

The function declaration can be any function that takes parameters that are type annotated with the following types:

  • Project: the encord.project.Project that the runner is operating on.
  • LabelRowV2: the encord.objects.LabelRowV2 that the task is associated with.
  • AgentTask: the encord.workflow.stages.agent.AgentTask that the task is associated with.
  • Any other type: which is annotated with a dependency

All those parameters will be automatically injected when the agent is called.

Example:

from typing import Iterator
from typing_extensions import Annotated

from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask

runner = Runner()

def random_value() -> float:
    import random
    return random.random()

@runner.stage("<stage_name_or_uuid>")
def my_func(
    project: Project,
    lr: LabelRowV2,
    task: AgentTask,
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
    custom: Annotated[float, Depends(random_value)]
) -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

Parameters:

  • stage (str | UUID) –

    The name or uuid of the stage that the function should be associated with.

  • label_row_metadata_include_args (LabelRowMetadataIncludeArgs | None, default: None ) –

    Arguments to be passed to project.list_label_rows_v2(...)

  • label_row_initialise_labels_args (LabelRowInitialiseLabelsArgs | None, default: None ) –

    Arguments to be passed to label_row.initialise_labels(...)

Returns:

  • Callable[[DecoratedCallable], DecoratedCallable] –

    The decorated function.

Source code in encord_agents/tasks/runner.py
def stage(
    self,
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None = None,
) -> Callable[[DecoratedCallable], DecoratedCallable]:
    r"""
    Decorator to associate a function with an agent stage.

    A function decorated with a stage is added to the list of stages
    that will be handled by the runner.
    The runner will call the function for every task which is in that
    stage.


    **Example:**

    ```python
    runner = Runner()

    @runner.stage("<stage_name_or_uuid>")
    def my_func() -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    The function declaration can be any function that takes parameters
    that are type annotated with the following types:

    * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
        that the runner is operating on.
    * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
        that the task is associated with.
    * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
        that the task is associated with.
    * Any other type: which is annotated with a [dependency](/dependencies.md)

    All those parameters will be automatically injected when the agent is called.

    **Example:**

    ```python
    from typing import Iterator
    from typing_extensions import Annotated

    from encord.project import Project
    from encord_agents.tasks import Depends
    from encord_agents.tasks.dependencies import dep_video_iterator
    from encord.workflow.stages.agent import AgentTask

    runner = Runner()

    def random_value() -> float:
        import random
        return random.random()

    @runner.stage("<stage_name_or_uuid>")
    def my_func(
        project: Project,
        lr: LabelRowV2,
        task: AgentTask,
        video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
        custom: Annotated[float, Depends(random_value)]
    ) -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
    [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
    [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

    Args:
        stage: The name or uuid of the stage that the function should be
            associated with.
        label_row_metadata_include_args: Arguments to be passed to
            `project.list_label_rows_v2(...)`
        label_row_initialise_labels_args: Arguments to be passed to
            `label_row.initialise_labels(...)`

    Returns:
        The decorated function.
    """
    stage_uuid, printable_name = self.validate_stage(stage)

    def decorator(func: DecoratedCallable) -> DecoratedCallable:
        self._add_stage_agent(
            stage_uuid, func, printable_name, label_row_metadata_include_args, label_row_initialise_labels_args
        )
        return func

    return decorator

RunnerBase

Source code in encord_agents/tasks/runner.py
class RunnerBase:
    @staticmethod
    def verify_project_hash(ph: str | UUID) -> str:
        try:
            ph = str(UUID(str(ph)))
        except ValueError:
            print("Could not read project_hash as a UUID")
            raise Abort()
        return ph

    @staticmethod
    def get_stage_names(valid_stages: list[AgentStage], join_str: str = ", ") -> str:
        return join_str.join(
            [f'[magenta]AgentStage(title="{k.title}", uuid="{k.uuid}")[/magenta]' for k in valid_stages]
        )

    @staticmethod
    def validate_project(project: Project | None) -> None:
        if project is None:
            return
        PROJECT_MUSTS = "Task agents only work for workflow projects that have agent nodes in the workflow."
        assert (
            project.project_type == ProjectType.WORKFLOW
        ), f"Provided project is not a workflow project. {PROJECT_MUSTS}"
        assert (
            len([s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]) > 0
        ), f"Provided project does not have any agent stages in it's workflow. {PROJECT_MUSTS}"

    def __init__(self, project_hash: str | UUID | None = None):
        """
        Initialize the runner with an optional project hash.

        The `project_hash` will allow stricter stage validation.
        If left unspecified, errors will first be raised during execution of the runner.

        Args:
            project_hash: The project hash that the runner applies to.

                Can be left unspecified to be able to reuse same runner on multiple projects.
        """
        self.project_hash = self.verify_project_hash(project_hash) if project_hash else None
        self.client = get_user_client()

        self.project: Project | None = self.client.get_project(self.project_hash) if self.project_hash else None
        self.validate_project(self.project)

        self.valid_stages: list[AgentStage] | None = None
        if self.project is not None:
            self.valid_stages = [s for s in self.project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        self.agents: list[RunnerAgent] = []

    def validate_stage(self, stage: str | UUID) -> tuple[UUID | str, str]:
        """
        Returns stage uuid and printable name.
        """
        printable_name = str(stage)
        try:
            stage = UUID(str(stage))
        except ValueError:
            pass

        if self.valid_stages is not None:
            selected_stage: WorkflowStage | None = None
            for v_stage in self.valid_stages:
                attr = v_stage.title if isinstance(stage, str) else v_stage.uuid
                if attr == stage:
                    selected_stage = v_stage

            if selected_stage is None:
                agent_stage_names = self.get_stage_names(self.valid_stages)
                raise PrintableError(
                    rf"Stage name [blue]`{stage}`[/blue] could not be matched against a project stage. Valid stages are \[{agent_stage_names}]."
                )
            stage = selected_stage.uuid

        if stage in [a.identity for a in self.agents]:
            raise PrintableError(
                f"Stage name [blue]`{printable_name}`[/blue] has already been assigned a function. You can only assign one callable to each agent stage."
            )
        return stage, printable_name

    def _add_stage_agent(
        self,
        identity: str | UUID,
        func: Callable[..., TaskAgentReturn],
        printable_name: str | None,
        label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None,
        label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs | None,
    ) -> RunnerAgent:
        runner_agent = RunnerAgent(
            identity=identity,
            callable=func,
            printable_name=printable_name,
            label_row_metadata_include_args=label_row_metadata_include_args,
            label_row_initialise_labels_args=label_row_initialise_labels_args,
        )
        self.agents.append(runner_agent)
        return runner_agent
__init__
__init__(project_hash: str | UUID | None = None)

Initialize the runner with an optional project hash.

The project_hash will allow stricter stage validation. If left unspecified, errors will first be raised during execution of the runner.

Parameters:

  • project_hash (str | UUID | None, default: None ) –

    The project hash that the runner applies to.

    Can be left unspecified to be able to reuse same runner on multiple projects.

Source code in encord_agents/tasks/runner.py
def __init__(self, project_hash: str | UUID | None = None):
    """
    Initialize the runner with an optional project hash.

    The `project_hash` will allow stricter stage validation.
    If left unspecified, errors will first be raised during execution of the runner.

    Args:
        project_hash: The project hash that the runner applies to.

            Can be left unspecified to be able to reuse same runner on multiple projects.
    """
    self.project_hash = self.verify_project_hash(project_hash) if project_hash else None
    self.client = get_user_client()

    self.project: Project | None = self.client.get_project(self.project_hash) if self.project_hash else None
    self.validate_project(self.project)

    self.valid_stages: list[AgentStage] | None = None
    if self.project is not None:
        self.valid_stages = [s for s in self.project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
    self.agents: list[RunnerAgent] = []
validate_stage
validate_stage(stage: str | UUID) -> tuple[UUID | str, str]

Returns stage uuid and printable name.

Source code in encord_agents/tasks/runner.py
def validate_stage(self, stage: str | UUID) -> tuple[UUID | str, str]:
    """
    Returns stage uuid and printable name.
    """
    printable_name = str(stage)
    try:
        stage = UUID(str(stage))
    except ValueError:
        pass

    if self.valid_stages is not None:
        selected_stage: WorkflowStage | None = None
        for v_stage in self.valid_stages:
            attr = v_stage.title if isinstance(stage, str) else v_stage.uuid
            if attr == stage:
                selected_stage = v_stage

        if selected_stage is None:
            agent_stage_names = self.get_stage_names(self.valid_stages)
            raise PrintableError(
                rf"Stage name [blue]`{stage}`[/blue] could not be matched against a project stage. Valid stages are \[{agent_stage_names}]."
            )
        stage = selected_stage.uuid

    if stage in [a.identity for a in self.agents]:
        raise PrintableError(
            f"Stage name [blue]`{printable_name}`[/blue] has already been assigned a function. You can only assign one callable to each agent stage."
        )
    return stage, printable_name