Skip to content

Task agents reference

Task agents

encord_agents.tasks.dependencies

Twin dataclass

Dataclass to hold "label twin" information.

Source code in encord_agents/tasks/dependencies.py
@dataclass(frozen=True)
class Twin:
    """
    Dataclass to hold "label twin" information.
    """

    label_row: LabelRowV2
    task: WorkflowTask | None

dep_client

dep_client() -> EncordUserClient

Dependency to provide an authenticated user client.

Example:

from encord.user_client import EncordUserClient
from encord_agents.tasks.depencencies import dep_client
...
@runner.stage("<my_stage_name>")
def my_agent(
    client: Annotated[EncordUserClient, Depends(dep_client)]
) -> str:
    # Client will authenticated and ready to use.
    client.get_dataset("")
Source code in encord_agents/tasks/dependencies.py
def dep_client() -> EncordUserClient:
    """
    Dependency to provide an authenticated user client.

    **Example:**

    ```python
    from encord.user_client import EncordUserClient
    from encord_agents.tasks.depencencies import dep_client
    ...
    @runner.stage("<my_stage_name>")
    def my_agent(
        client: Annotated[EncordUserClient, Depends(dep_client)]
    ) -> str:
        # Client will authenticated and ready to use.
        client.get_dataset("")
    ```

    """
    return get_user_client()

dep_data_lookup

dep_data_lookup(lookup: Annotated[DataLookup, Depends(DataLookup.sharable)]) -> DataLookup

Get a lookup to easily retrieve data rows and storage items associated with the given task.

Info

If you're just looking to get the associated storage item to a task, consider using dep_storage_item instead.

The lookup can, e.g., be useful for

  • Updating client metadata
  • Downloading data from signed urls
  • Matching data to other projects

Example:

from encord.orm.dataset import DataRow
from encord.stotage import StorageItem
from encord.workflow.stages.agent import AgentTask

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
) -> str:
    # Data row from the underlying dataset
    data_row: DataRow = lookup.get_data_row(task.data_hash)

    # Storage item from Encord Index
    storage_item: StorageItem = lookup.get_storage_item(task.data_hash)

    # Current metadata
    client_metadata = storage_item.client_metadata

    # Update metadata
    storage_item.update(
        client_metadata={
            "new": "entry",
            **(client_metadata or {})
        }
    )  # metadata. Make sure not to update in place!
    ...

Parameters:

  • lookup (Annotated[DataLookup, Depends(sharable)]) –

    The object that you can use to lookup data rows and storage items. Automatically injected.

Returns:

  • DataLookup

    The (shared) lookup object.

Source code in encord_agents/tasks/dependencies.py
def dep_data_lookup(lookup: Annotated[DataLookup, Depends(DataLookup.sharable)]) -> DataLookup:
    """
    Get a lookup to easily retrieve data rows and storage items associated with the given task.

    !!! info
        If you're just looking to get the associated storage item to a task, consider using `dep_storage_item` instead.


    The lookup can, e.g., be useful for

    * Updating client metadata
    * Downloading data from signed urls
    * Matching data to other projects

    **Example:**

    ```python
    from encord.orm.dataset import DataRow
    from encord.stotage import StorageItem
    from encord.workflow.stages.agent import AgentTask

    @runner.stage(stage="Agent 1")
    def my_agent(
        task: AgentTask,
        lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
    ) -> str:
        # Data row from the underlying dataset
        data_row: DataRow = lookup.get_data_row(task.data_hash)

        # Storage item from Encord Index
        storage_item: StorageItem = lookup.get_storage_item(task.data_hash)

        # Current metadata
        client_metadata = storage_item.client_metadata

        # Update metadata
        storage_item.update(
            client_metadata={
                "new": "entry",
                **(client_metadata or {})
            }
        )  # metadata. Make sure not to update in place!
        ...
    ```


    Args:
        lookup: The object that you can use to lookup data rows and storage items. Automatically injected.

    Returns:
        The (shared) lookup object.

    """
    return lookup

dep_single_frame

dep_single_frame(lr: LabelRowV2) -> NDArray[np.uint8]

Dependency to inject the first frame of the underlying asset.

The downloaded asset will be named lr.data_hash.{suffix}. When the function has finished, the downloaded file will be removed from the file system.

Example:

from encord_agents import FrameData
from encord_agents.tasks.depencencies import dep_single_frame
...

@runner.stage("<my_stage_name>")
def my_agent(
    lr: LabelRowV2,  # <- Automatically injected
    frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
) -> str:
    assert frame.ndim == 3, "Will work"

Parameters:

  • lr (LabelRowV2) –

    The label row. Automatically injected (see example above).

Returns:

  • NDArray[uint8]

    Numpy array of shape [h, w, 3] RGB colors.

Source code in encord_agents/tasks/dependencies.py
def dep_single_frame(lr: LabelRowV2) -> NDArray[np.uint8]:
    """
    Dependency to inject the first frame of the underlying asset.

    The downloaded asset will be named `lr.data_hash.{suffix}`.
    When the function has finished, the downloaded file will be removed from the file system.

    **Example:**

    ```python
    from encord_agents import FrameData
    from encord_agents.tasks.depencencies import dep_single_frame
    ...

    @runner.stage("<my_stage_name>")
    def my_agent(
        lr: LabelRowV2,  # <- Automatically injected
        frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
    ) -> str:
        assert frame.ndim == 3, "Will work"
    ```

    Args:
        lr: The label row. Automatically injected (see example above).

    Returns:
        Numpy array of shape [h, w, 3] RGB colors.

    """
    with download_asset(lr, frame=0) as asset:
        img = cv2.cvtColor(cv2.imread(asset.as_posix()), cv2.COLOR_BGR2RGB)

    return np.asarray(img, dtype=np.uint8)

dep_storage_item

dep_storage_item(lookup: Annotated[DataLookup, Depends(dep_data_lookup)], task: AgentTask) -> StorageItem

Get the storage item associated with the underlying agent task.

The StorageItem is useful for multiple things like

  • Updating client metadata
  • Reading file properties like storage location, fps, duration, DICOM tags, etc.

Example

from encord.storage import StorageItem
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="<my_stage_name>")
def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
    print(storage_item.name)
    print(storage_item.client_metadata)
    ...
Source code in encord_agents/tasks/dependencies.py
def dep_storage_item(lookup: Annotated[DataLookup, Depends(dep_data_lookup)], task: AgentTask) -> StorageItem:
    r"""
    Get the storage item associated with the underlying agent task.

    The [`StorageItem`](https://docs.encord.com/sdk-documentation/sdk-references/StorageItem){ target="\_blank", rel="noopener noreferrer" }
    is useful for multiple things like

    * Updating client metadata
    * Reading file properties like storage location, fps, duration, DICOM tags, etc.

    **Example**

    ```python
    from encord.storage import StorageItem
    from encord_agents.tasks.dependencies import dep_storage_item

    @runner.stage(stage="<my_stage_name>")
    def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
        print(storage_item.name)
        print(storage_item.client_metadata)
        ...
    ```

    """
    return lookup.get_storage_item(task.data_hash)

dep_twin_label_row

dep_twin_label_row(twin_project_hash: str, init_labels: bool = True, include_task: bool = False) -> Callable[[LabelRowV2], Twin | None]

Dependency to link assets between two Projects. When your Runner in running on <project_hash_a>, you can use this to get a Twin of labels and the underlying task in the "twin project" with <project_hash_b>.

This is useful in situations like:

  • When you want to transfer labels from a source project" to a sink project.
  • If you want to compare labels to labels from other projects upon label submission.
  • If you want to extend an existing project with labels from another project on the same underlying data.

Example:

from encord.workflow.common import WorkflowTask
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<my_stage_name_in_project_a>")
def my_agent(
    project_a_label_row: LabelRowV2,
    twin: Annotated[
        Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
    ],
) -> str | None:
    label_row_from_project_b: LabelRowV2 = twin.label_row
    task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)

Parameters:

  • twin_project_hash (str) –

    The project has of the twin project (attached to the same datasets) from which you want to load the additional data.

  • init_labels (bool, default: True ) –

    If true, the label row will be initialized before calling the agent.

  • include_task (bool, default: False ) –

    If true, the task field of the Twin will be populated. If population failes, e.g., for non-workflow projects, the task will also be None.

Returns:

  • Callable[[LabelRowV2], Twin | None]

    The twin.

Source code in encord_agents/tasks/dependencies.py
def dep_twin_label_row(
    twin_project_hash: str, init_labels: bool = True, include_task: bool = False
) -> Callable[[LabelRowV2], Twin | None]:
    """
    Dependency to link assets between two Projects. When your `Runner` in running on
    `<project_hash_a>`, you can use this to get a `Twin` of labels and the underlying
    task in the "twin project" with `<project_hash_b>`.

    This is useful in situations like:

    * When you want to transfer labels from a source project" to a sink project.
    * If you want to compare labels to labels from other projects upon label submission.
    * If you want to extend an existing project with labels from another project on the same underlying data.

    **Example:**

    ```python
    from encord.workflow.common import WorkflowTask
    from encord.objects.ontology_labels_impl import LabelRowV2
    from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
    ...
    runner = Runner(project_hash="<project_hash_a>")

    @runner.stage("<my_stage_name_in_project_a>")
    def my_agent(
        project_a_label_row: LabelRowV2,
        twin: Annotated[
            Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
        ],
    ) -> str | None:
        label_row_from_project_b: LabelRowV2 = twin.label_row
        task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)
    ```

    Args:
        twin_project_hash: The project has of the twin project (attached to the same datasets)
            from which you want to load the additional data.
        init_labels: If true, the label row will be initialized before calling the agent.
        include_task: If true, the `task` field of the `Twin` will be populated. If population
            failes, e.g., for non-workflow projects, the task will also be None.

    Returns:
        The twin.

    Raises:
        `encord.AuthorizationError` if you do not have access to the project.
    """
    client = get_user_client()
    try:
        twin_project = client.get_project(twin_project_hash)
    except (AuthorisationError, AuthenticationError):
        raise PrintableError(
            f"You do not seem to have access to the project with project hash `[blue]{twin_project_hash}[/blue]`"
        )

    label_rows: dict[str, LabelRowV2] = {lr.data_hash: lr for lr in twin_project.list_label_rows_v2()}

    def get_twin_label_row(lr_original: LabelRowV2) -> Twin | None:
        lr_twin = label_rows.get(lr_original.data_hash)
        if lr_twin is None:
            return None

        if init_labels:
            lr_twin.initialise_labels()

        graph_node = lr_twin.workflow_graph_node
        task: WorkflowTask | None = None

        if include_task and graph_node is not None:
            try:
                stage: WorkflowStage = twin_project.workflow.get_stage(uuid=graph_node.uuid)
                for task in stage.get_tasks(data_hash=lr_original.data_hash):
                    pass
            except Exception:
                # TODO: print proper warning.
                pass

        return Twin(label_row=lr_twin, task=task)

    return get_twin_label_row

dep_video_iterator

dep_video_iterator(lr: LabelRowV2) -> Generator[Iterator[Frame], None, None]

Dependency to inject a video frame iterator for doing things over many frames.

Intended use

from encord_agents import FrameData
from encord_agents.tasks.depencencies import dep_video_iterator
...

@runner.stage("<my_stage_name>")
def my_agent(
    lr: LabelRowV2,  # <- Automatically injected
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
    for frame in video_frames:
        print(frame.frame, frame.content.shape)

Parameters:

  • lr (LabelRowV2) –

    Automatically injected label row dependency.

Raises:

  • NotImplementedError

    Will fail for other data types than video.

Yields:

  • Iterator[Frame]

    An iterator.

Source code in encord_agents/tasks/dependencies.py
def dep_video_iterator(lr: LabelRowV2) -> Generator[Iterator[Frame], None, None]:
    """
    Dependency to inject a video frame iterator for doing things over many frames.

    **Intended use**

    ```python
    from encord_agents import FrameData
    from encord_agents.tasks.depencencies import dep_video_iterator
    ...

    @runner.stage("<my_stage_name>")
    def my_agent(
        lr: LabelRowV2,  # <- Automatically injected
        video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
    ) -> str:
        for frame in video_frames:
            print(frame.frame, frame.content.shape)
    ```

    Args:
        lr: Automatically injected label row dependency.

    Raises:
        NotImplementedError: Will fail for other data types than video.

    Yields:
        An iterator.

    """
    if not lr.data_type == DataType.VIDEO:
        raise NotImplementedError("`dep_video_iterator` only supported for video label rows")

    with download_asset(lr, None) as asset:
        yield iter_video(asset)

encord_agents.tasks.runner

Runner

Runs agents against Workflow projects.

When called, it will iteratively run agent stages till they are empty. By default, runner will exit after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the refresh_every keyword.

Example:

example_agent.py
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()

@runner.stage("<workflow_node_name>")
# or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
    ...
    return "pathway name"  # or pathway uuid


runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
# or
if __name__ == "__main__":
    # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
    runner.run()
Source code in encord_agents/tasks/runner.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
class Runner:
    """
    Runs agents against Workflow projects.

    When called, it will iteratively run agent stages till they are empty.
    By default, runner will exit after finishing the tasks identified at the point of trigger.
    To automatically re-run, you can use the `refresh_every` keyword.

    **Example:**

    ```python title="example_agent.py"
    from uuid import UUID
    from encord_agents.tasks import Runner
    runner = Runner()

    @runner.stage("<workflow_node_name>")
    # or
    @runner.stage("<workflow_node_uuid>")
    def my_agent(task: AgentTask) -> str | UUID | None:
        ...
        return "pathway name"  # or pathway uuid


    runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
    # or
    if __name__ == "__main__":
        # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
        runner.run()
    ```

    """

    @staticmethod
    def verify_project_hash(ph: str) -> str:
        try:
            ph = str(UUID(ph))
        except ValueError:
            print("Could not read project_hash as a UUID")
            raise Abort()
        return ph

    def __init__(self, project_hash: str | None = None):
        self.project_hash = self.verify_project_hash(project_hash) if project_hash else None
        self.client = get_user_client()

        self.project: Project | None = self.client.get_project(self.project_hash) if self.project_hash else None
        self.validate_project(self.project)

        self.valid_stages: list[AgentStage] | None = None
        if self.project is not None:
            self.valid_stages = [s for s in self.project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]

        self.agents: list[RunnerAgent] = []
        self.was_called_from_cli = False

    @staticmethod
    def validate_project(project: Project | None):
        if project is None:
            return
        PROJECT_MUSTS = "Task agents only work for workflow projects that have agent nodes in the workflow."
        assert (
            project.project_type == ProjectType.WORKFLOW
        ), f"Provided project is not a workflow project. {PROJECT_MUSTS}"
        assert (
            len([s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]) > 0
        ), f"Provided project does not have any agent stages in it's workflow. {PROJECT_MUSTS}"

    def _add_stage_agent(self, identity: str | UUID, func: Callable[..., TaskAgentReturn], printable_name: str | None):
        self.agents.append(RunnerAgent(identity=identity, callable=func, printable_name=printable_name))

    def stage(self, stage: str | UUID) -> Callable[[DecoratedCallable], DecoratedCallable]:
        r"""
        Decorator to associate a function with an agent stage.

        A function decorated with a stage is added to the list of stages
        that will be handled by the runner.
        The runner will call the function for every task which is in that
        stage.


        **Example:**

        ```python
        runner = Runner()

        @runner.stage("<stage_name_or_uuid>")
        def my_func() -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        The function declaration can be any function that takes parameters
        that are type annotated with the following types:

        * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
            that the runner is operating on.
        * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
            that the task is associated with.
        * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
            that the task is associated with.
        * Any other type: which is annotated with a [dependency](/dependencies.md)

        All those parameters will be automatically injected when the agent is called.

        **Example:**

        ```python
        from typing import Iterator
        from typing_extensions import Annotated

        from encord.project import Project
        from encord_agents.tasks import Depends
        from encord_agents.tasks.dependencies import dep_video_iterator
        from encord.workflow.stages.agent import AgentTask

        runner = Runner()

        def random_value() -> float:
            import random
            return random.random()

        @runner.stage("<stage_name_or_uuid>")
        def my_func(
            project: Project,
            lr: LabelRowV2,
            task: AgentTask,
            video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
            custom: Annotated[float, Depends(random_value)]
        ) -> str | None:
            ...
            return "<pathway_name or pathway_uuid>"
        ```

        [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
        [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
        [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

        Args:
            stage: The name or uuid of the stage that the function should be
                associated with.

        Returns:
            The decorated function.
        """
        printable_name = str(stage)
        try:
            stage = UUID(str(stage))
        except ValueError:
            pass

        if self.valid_stages is not None:
            selected_stage: WorkflowStage | None = None
            for v_stage in self.valid_stages:
                attr = v_stage.title if isinstance(stage, str) else v_stage.uuid
                if attr == stage:
                    selected_stage = v_stage

            if selected_stage is None:
                agent_stage_names = self.get_stage_names(self.valid_stages)
                raise PrintableError(
                    rf"Stage name [blue]`{stage}`[/blue] could not be matched against a project stage. Valid stages are \[{agent_stage_names}]."
                )
            stage = selected_stage.uuid

        if stage in [a.identity for a in self.agents]:
            raise PrintableError(
                f"Stage name [blue]`{printable_name}`[/blue] has already been assigned a function. You can only assign one callable to each agent stage."
            )

        def decorator(func: DecoratedCallable) -> DecoratedCallable:
            self._add_stage_agent(stage, func, printable_name)
            return func

        return decorator

    def _execute_tasks(
        self,
        tasks: Iterable[tuple[AgentTask, LabelRowV2 | None]],
        runner_agent: RunnerAgent,
        # num_threads: int,
        num_retries: int,
        pbar: tqdm | None = None,
    ) -> None:
        project = cast(Project, self.project)
        with Bundle() as bundle:
            for task, label_row in tasks:
                with ExitStack() as stack:
                    context = Context(project=project, task=task, label_row=label_row)
                    dependencies = solve_dependencies(context=context, dependant=runner_agent.dependant, stack=stack)
                    for attempt in range(num_retries + 1):
                        try:
                            next_stage = runner_agent.callable(**dependencies.values)
                            if next_stage is None:
                                pass
                            elif isinstance(next_stage, UUID):
                                task.proceed(pathway_uuid=str(next_stage), bundle=bundle)
                            else:
                                try:
                                    _next_stage = UUID(next_stage)
                                    task.proceed(pathway_uuid=str(_next_stage), bundle=bundle)
                                except ValueError:
                                    task.proceed(pathway_name=next_stage, bundle=bundle)

                            if pbar is not None:
                                pbar.update(1)
                            break

                        except KeyboardInterrupt:
                            raise
                        except Exception:
                            print(f"[attempt {attempt+1}/{num_retries+1}] Agent failed with error: ")
                            traceback.print_exc()

    @staticmethod
    def get_stage_names(valid_stages: list[AgentStage], join_str: str = ", "):
        return join_str.join(
            [f'[magenta]AgentStage(title="{k.title}", uuid="{k.uuid}")[/magenta]' for k in valid_stages]
        )

    def __call__(
        self,
        # num_threads: int = 1,
        refresh_every: int | None = None,
        num_retries: int = 3,
        task_batch_size: int = 300,
        project_hash: Optional[str] = None,
    ):
        """
        Run your task agent.

        The runner can continuously keep looking for new tasks in the project and execute the agent.

        Args:
            refresh_every: Fetch task statuses from the Encord projecet every `refresh_every` seconds.
                If `None`, they the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: the project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        else:
            project = self.project

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(data_hashes=[t.data_hash for t in batch])
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle)

                            self._execute_tasks(
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar=pbar,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(data_hashes=[t.data_hash for t in batch])
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle)
                        self._execute_tasks(zip(batch, batch_lrs), runner_agent, num_retries, pbar=pbar)
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise

    def run(self):
        """
        Execute the runner.

        This function is intended to be called from the "main file".
        It is an entry point to be able to run the agent(s) via your shell
        with command line arguments.

        **Example:**

        ```python title="example.py"
        runner = Runner(project_hash="<your_project_hash>")

        @runner.stage(stage="...")
        def your_func() -> str:
            ...

        if __name__ == "__main__":
            runner.run()
        ```

        You can then run execute the runner with:

        ```shell
        python example.py --help
        ```

        to see the options is has (it's those from `Runner.__call__`).

        """
        from typer import Typer

        self.was_called_from_cli = True
        app = Typer(add_completion=False, rich_markup_mode="rich")
        app.command()(self.__call__)
        app()
__call__
__call__(refresh_every: int | None = None, num_retries: int = 3, task_batch_size: int = 300, project_hash: Optional[str] = None)

Run your task agent.

The runner can continuously keep looking for new tasks in the project and execute the agent.

Parameters:

  • refresh_every (int | None, default: None ) –

    Fetch task statuses from the Encord projecet every refresh_every seconds. If None, they the runner will exit once task queue is empty.

  • num_retries (int, default: 3 ) –

    If an agent fails on a task, how many times should the runner retry it?

  • task_batch_size (int, default: 300 ) –

    Number of tasks for which labels are loaded into memory at once.

  • project_hash (Optional[str], default: None ) –

    the project hash if not defined at runner instantiation.

Returns: None

Source code in encord_agents/tasks/runner.py
    def __call__(
        self,
        # num_threads: int = 1,
        refresh_every: int | None = None,
        num_retries: int = 3,
        task_batch_size: int = 300,
        project_hash: Optional[str] = None,
    ):
        """
        Run your task agent.

        The runner can continuously keep looking for new tasks in the project and execute the agent.

        Args:
            refresh_every: Fetch task statuses from the Encord projecet every `refresh_every` seconds.
                If `None`, they the runner will exit once task queue is empty.
            num_retries: If an agent fails on a task, how many times should the runner retry it?
            task_batch_size: Number of tasks for which labels are loaded into memory at once.
            project_hash: the project hash if not defined at runner instantiation.
        Returns:
            None
        """
        # Verify project
        if project_hash is not None:
            project_hash = self.verify_project_hash(project_hash)
            project = self.client.get_project(project_hash)
        else:
            project = self.project

        if project is None:
            import sys

            raise PrintableError(
                f"""Please specify project hash in one of the following ways:  
* At instantiation: [blue]`runner = Runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called directly: [blue]`runner(project_hash="[green]<project_hash>[/green]")`[/blue]
* When called from CLI: [blue]`python {sys.argv[0]} --project-hash [green]<project_hash>[/green]`[/blue]
"""
            )

        self.validate_project(project)

        # Verify stages
        valid_stages = [s for s in project.workflow.stages if s.stage_type == WorkflowStageType.AGENT]
        agent_stages: dict[str | UUID, WorkflowStage] = {
            **{s.title: s for s in valid_stages},
            **{s.uuid: s for s in valid_stages},
        }
        try:
            for runner_agent in self.agents:
                fn_name = getattr(runner_agent.callable, "__name__", "agent function")
                separator = f"{os.linesep}\t"
                agent_stage_names = separator + self.get_stage_names(valid_stages, join_str=separator) + os.linesep
                if runner_agent.identity not in agent_stages:
                    suggestion: str
                    if len(valid_stages) == 1:
                        suggestion = f'Did you mean to wrap [blue]`{fn_name}`[/blue] with{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].title}")[/magenta]{os.linesep}or{os.linesep}[magenta]@runner.stage(stage="{valid_stages[0].uuid}")[/magenta]'
                    else:
                        suggestion = f"""
Please use either name annoitations: 
[magenta]@runner.stage(stage="<exact_stage_name>")[/magenta] 

or uuid annotations:
[magenta]@runner.stage(stage="<exact_stage_uuid>")[/magenta] 

For example, if we use the first agent stage listed above, we can use:
[magenta]@runner.stage(stage="{valid_stages[0].title}")
def {fn_name}(...):
    ...
[/magenta]
# or
[magenta]@runner.stage(stage="{valid_stages[0].uuid}")
def {fn_name}(...):
    ...[/magenta]"""
                    raise PrintableError(
                        rf"""Your function [blue]`{fn_name}`[/blue] was annotated to match agent stage [blue]`{runner_agent.printable_name}`[/blue] but that stage is not present as an agent stage in your project workflow. The workflow has following agent stages:

[{agent_stage_names}]

{suggestion}
                        """
                    )

                stage = agent_stages[runner_agent.identity]
                if stage.stage_type != WorkflowStageType.AGENT:
                    raise PrintableError(
                        f"""You cannot use the stage of type `{stage.stage_type}` as an agent stage. It has to be one of the agent stages: 
[{agent_stage_names}]."""
                    )

            # Run
            delta = timedelta(seconds=refresh_every) if refresh_every else None
            next_execution = None

            while True:
                if isinstance(next_execution, datetime):
                    if next_execution > datetime.now():
                        duration = next_execution - datetime.now()
                        print(f"Sleeping {duration.total_seconds()} secs until next execution time.")
                        time.sleep(duration.total_seconds())
                elif next_execution is not None:
                    break

                next_execution = datetime.now() + delta if delta else False
                for runner_agent in self.agents:
                    stage = agent_stages[runner_agent.identity]

                    batch: list[AgentTask] = []
                    batch_lrs: list[LabelRowV2 | None] = []

                    tasks = list(stage.get_tasks())
                    pbar = tqdm(desc="Executing tasks", total=len(tasks))
                    for task in tasks:
                        if not isinstance(task, AgentTask):
                            continue
                        batch.append(task)
                        if len(batch) == task_batch_size:
                            batch_lrs = [None] * len(batch)
                            if runner_agent.dependant.needs_label_row:
                                label_rows = {
                                    UUID(lr.data_hash): lr
                                    for lr in project.list_label_rows_v2(data_hashes=[t.data_hash for t in batch])
                                }
                                batch_lrs = [label_rows.get(t.data_hash) for t in batch]
                                with project.create_bundle() as lr_bundle:
                                    for lr in batch_lrs:
                                        if lr:
                                            lr.initialise_labels(bundle=lr_bundle)

                            self._execute_tasks(
                                zip(batch, batch_lrs),
                                runner_agent,
                                num_retries,
                                pbar=pbar,
                            )

                            batch = []
                            batch_lrs = []

                    if len(batch) > 0:
                        batch_lrs = [None] * len(batch)
                        if runner_agent.dependant.needs_label_row:
                            label_rows = {
                                UUID(lr.data_hash): lr
                                for lr in project.list_label_rows_v2(data_hashes=[t.data_hash for t in batch])
                            }
                            batch_lrs = [label_rows[t.data_hash] for t in batch]
                            with project.create_bundle() as lr_bundle:
                                for lr in batch_lrs:
                                    if lr:
                                        lr.initialise_labels(bundle=lr_bundle)
                        self._execute_tasks(zip(batch, batch_lrs), runner_agent, num_retries, pbar=pbar)
        except (PrintableError, AssertionError) as err:
            if self.was_called_from_cli:
                panel = Panel(err.args[0], width=None)
                rich.print(panel)
                raise Abort()
            else:
                if isinstance(err, PrintableError):
                    from rich.text import Text

                    plain_text = Text.from_markup(err.args[0]).plain
                    err.args = (plain_text,)
                raise
run
run()

Execute the runner.

This function is intended to be called from the "main file". It is an entry point to be able to run the agent(s) via your shell with command line arguments.

Example:

example.py
runner = Runner(project_hash="<your_project_hash>")

@runner.stage(stage="...")
def your_func() -> str:
    ...

if __name__ == "__main__":
    runner.run()

You can then run execute the runner with:

python example.py --help

to see the options is has (it's those from Runner.__call__).

Source code in encord_agents/tasks/runner.py
def run(self):
    """
    Execute the runner.

    This function is intended to be called from the "main file".
    It is an entry point to be able to run the agent(s) via your shell
    with command line arguments.

    **Example:**

    ```python title="example.py"
    runner = Runner(project_hash="<your_project_hash>")

    @runner.stage(stage="...")
    def your_func() -> str:
        ...

    if __name__ == "__main__":
        runner.run()
    ```

    You can then run execute the runner with:

    ```shell
    python example.py --help
    ```

    to see the options is has (it's those from `Runner.__call__`).

    """
    from typer import Typer

    self.was_called_from_cli = True
    app = Typer(add_completion=False, rich_markup_mode="rich")
    app.command()(self.__call__)
    app()
stage
stage(stage: str | UUID) -> Callable[[DecoratedCallable], DecoratedCallable]

Decorator to associate a function with an agent stage.

A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.

Example:

runner = Runner()

@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

The function declaration can be any function that takes parameters that are type annotated with the following types:

  • Project: the encord.project.Project that the runner is operating on.
  • LabelRowV2: the encord.objects.LabelRowV2 that the task is associated with.
  • AgentTask: the encord.workflow.stages.agent.AgentTask that the task is associated with.
  • Any other type: which is annotated with a dependency

All those parameters will be automatically injected when the agent is called.

Example:

from typing import Iterator
from typing_extensions import Annotated

from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask

runner = Runner()

def random_value() -> float:
    import random
    return random.random()

@runner.stage("<stage_name_or_uuid>")
def my_func(
    project: Project,
    lr: LabelRowV2,
    task: AgentTask,
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
    custom: Annotated[float, Depends(random_value)]
) -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"

Parameters:

  • stage (str | UUID) –

    The name or uuid of the stage that the function should be associated with.

Returns:

  • Callable[[DecoratedCallable], DecoratedCallable]

    The decorated function.

Source code in encord_agents/tasks/runner.py
def stage(self, stage: str | UUID) -> Callable[[DecoratedCallable], DecoratedCallable]:
    r"""
    Decorator to associate a function with an agent stage.

    A function decorated with a stage is added to the list of stages
    that will be handled by the runner.
    The runner will call the function for every task which is in that
    stage.


    **Example:**

    ```python
    runner = Runner()

    @runner.stage("<stage_name_or_uuid>")
    def my_func() -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    The function declaration can be any function that takes parameters
    that are type annotated with the following types:

    * [Project][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.project.Project`
        that the runner is operating on.
    * [LabelRowV2][docs-label-row]{ target="\_blank", rel="noopener noreferrer" }: the `encord.objects.LabelRowV2`
        that the task is associated with.
    * [AgentTask][docs-project]{ target="\_blank", rel="noopener noreferrer" }: the `encord.workflow.stages.agent.AgentTask`
        that the task is associated with.
    * Any other type: which is annotated with a [dependency](/dependencies.md)

    All those parameters will be automatically injected when the agent is called.

    **Example:**

    ```python
    from typing import Iterator
    from typing_extensions import Annotated

    from encord.project import Project
    from encord_agents.tasks import Depends
    from encord_agents.tasks.dependencies import dep_video_iterator
    from encord.workflow.stages.agent import AgentTask

    runner = Runner()

    def random_value() -> float:
        import random
        return random.random()

    @runner.stage("<stage_name_or_uuid>")
    def my_func(
        project: Project,
        lr: LabelRowV2,
        task: AgentTask,
        video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
        custom: Annotated[float, Depends(random_value)]
    ) -> str | None:
        ...
        return "<pathway_name or pathway_uuid>"
    ```

    [docs-project]:    https://docs.encord.com/sdk-documentation/sdk-references/project
    [docs-label-row]:  https://docs.encord.com/sdk-documentation/sdk-references/LabelRowV2
    [docs-agent-task]: https://docs.encord.com/sdk-documentation/sdk-references/AgentTask

    Args:
        stage: The name or uuid of the stage that the function should be
            associated with.

    Returns:
        The decorated function.
    """
    printable_name = str(stage)
    try:
        stage = UUID(str(stage))
    except ValueError:
        pass

    if self.valid_stages is not None:
        selected_stage: WorkflowStage | None = None
        for v_stage in self.valid_stages:
            attr = v_stage.title if isinstance(stage, str) else v_stage.uuid
            if attr == stage:
                selected_stage = v_stage

        if selected_stage is None:
            agent_stage_names = self.get_stage_names(self.valid_stages)
            raise PrintableError(
                rf"Stage name [blue]`{stage}`[/blue] could not be matched against a project stage. Valid stages are \[{agent_stage_names}]."
            )
        stage = selected_stage.uuid

    if stage in [a.identity for a in self.agents]:
        raise PrintableError(
            f"Stage name [blue]`{printable_name}`[/blue] has already been assigned a function. You can only assign one callable to each agent stage."
        )

    def decorator(func: DecoratedCallable) -> DecoratedCallable:
        self._add_stage_agent(stage, func, printable_name)
        return func

    return decorator