Celery Queue Runner Example¶
This example demonstrates how to use the Encord Agents QueueRunner with Celery and RabbitMQ for distributed task processing. This pattern can be used for any task that needs to be processed at scale, such as:
- Running machine learning inference on large datasets
- Batch processing of annotations
- Automated quality assurance checks
- Custom data preprocessing pipelines
Authentication¶
Before running the example, you need to set up authentication:
- Ensure you have an Encord account (register at app.encord.com/register)
- Create an SSH key pair following the documentation
- Set one of these environment variables:
💡 Consider creating a service account for running agents
Setup¶
- Install dependencies:
- Start RabbitMQ (using Docker):
- Update the Project hash in
queue_runner_example.py
with your Encord Project hash.
Running the Example¶
-
Start one or more Celery workers:
-
In a separate terminal, populate the queue with tasks:
The workers automatically start processing tasks as they are added to the queue.
Monitoring¶
You can monitor the RabbitMQ queue through the management interface at http://localhost:15672 (default credentials: guest/guest). This interface allows you to:
- Track queue lengths and processing rates
- Monitor worker status
- View error messages
- Access performance metrics
Architecture¶
This example implements a distributed task processing pipeline with the following components:
- QueueRunner: Wraps your agent implementation to handle Encord-specific logic
- Celery: Manages the distributed task queue and workers
- RabbitMQ: Acts as the message broker between components
The workflow follows these steps:
populate_queue.py
gets tasks from Encord and sends them to the Celery queue- Celery workers pick up tasks and execute them using your agent implementation
- Results are automatically handled by the QueueRunner wrapper, updating the Encord project
Scaling¶
To scale processing horizontally, start more worker processes on the same or different machines:
This architecture has the following benefits:
- Automatic load balancing: Celery distributes tasks among available workers
- Fault tolerance: Failed tasks can be automatically retried
- Scalability: Add or remove workers without changing application code
- Monitoring: Built-in tools for tracking task progress and worker status
Quick Start¶
- Install the requirements
- Start RabbitMQ
- Update the project hash
- Start one or more workers
- Run the populate queue script
The workers process tasks as they become available in the queue.
Customization¶
To adapt this example for your use case:
- Modify the agent implementation in
queue_runner_example.py
with your task logic - Adjust worker settings in
worker.py
for your performance needs - Configure RabbitMQ settings if needed for your environment
- Add any additional error handling or monitoring specific to your use case