{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Audio Transcription\n",
"\n",
"This notebook walks you through using an AI-powered Agent to transcribe and diarize audio files. Diarization is the practice of taking an Audio file and splitting out the various speakers and transcribing what they said. It is used to answer the question: \"Who spoke when?\". The Agent automatically converts speech to text while distinguishing between different speakers\n",
"\n",
"### Requirements\n",
"\n",
"This notebook guides you through the Workflow template, Ontology and model selection required.\n",
"\n",
"For this notebook, you need:\n",
"\n",
"- A Dataset containing Audio files in Encord.\n",
"- A Hugging Face User Access Token to access the Diarization models.\n",
"\n",
"\n",
"### Example Workflow\n",
"\n",
"The following workflow illustrates how audio files can be pre-labeled. The code in this notebook is for the `Diarization` agent.\n",
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Workflow Description:\n",
"\n",
"Following the diarization, predictions with a high confidence in the accuracy of the result are passed on for sentiment analysis, whilst those with low confidence in the accuracy are sent to an annotator for manual labeling. This minimises human annotator time and maximises the quality of the final transcripts."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Installation\n",
"\n",
"Ensure that you install:\n",
"- The `encord-agents` library.\n",
"- `pyannote.audio`: A deep learning-based toolkit for speaker diarization, used to identify and differentiate speakers in an audio file. \n",
"- `transformers & accelerate`: Hugging Face libraries used for running and optimizing transformer models, which improve transcription accuracy and performance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python -m pip install -q encord-agents \"pyannote.audio\"\n",
"!python -m pip install --upgrade -q transformers accelerate"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Hugging Face Authentication\n",
"\n",
"Retrieve the Hugging Face API token for authentication when accessing models and datasets. A Hugging Face (User Access) token is an authentication key used to access Hugging Face's models, datasets, and APIs.\n",
"\n",
"> 💡 In colab, you can set the key once in the secrets in the left sidebar and load it in new notebooks. IF YOU ARE NOT RUNNING THE CODE IN THE COLLAB NOTEBOOK, you must set the environment variable directly.\n",
"> ```python\n",
"> HF_TOKEN = \"my-hf-token\"\n",
"> ```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from google.colab import userdata\n",
"\n",
"HF_TOKEN = userdata.get(\"HF_TOKEN\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Log in to HuggingFace and accept the terms of these two models: \n",
"\n",
"- Speaker diarization\n",
"- Audio segmentation\n",
"\n",
"Optionally, you may need to accept the terms of this one additionally if an error is raised below\n",
"- Speaker diarization\n",
"\n",
"Otherwise, the models used in the following scripts will not run."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Defines data structures and imports necessary libraries for audio transcription and speaker diarization.\n",
"\n",
"- Imports necessary libraries for deep learning, audio processing, and transcription.\n",
"- `Segment`: Represents a time segment in an audio file.\n",
"- `Diary`: Stores diarization results, including the speaker and transcribed text.\n",
"- `Diarization`: A structured model to hold multiple diarized segments and retrieve unique speakers."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import List, Optional, Union\n",
"\n",
"import numpy as np\n",
"import requests\n",
"import torch\n",
"from encord.objects.frames import Range\n",
"from pyannote.audio import Pipeline\n",
"from pyannote.core.annotation import Annotation\n",
"from pydantic import BaseModel, RootModel\n",
"from sympy.physics.units import length\n",
"from torchaudio import functional as F\n",
"from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline\n",
"from transformers.pipelines.audio_utils import ffmpeg_read\n",
"\n",
"\n",
"class Segment(BaseModel):\n",
" start: float\n",
" end: float\n",
"\n",
" @property\n",
" def encord_range(self) -> Range:\n",
" return Range(int(0.5 + self.start * 1000), int(0.5 + self.end * 1000))\n",
"\n",
"\n",
"class Diary(BaseModel):\n",
" segment: Segment\n",
" speaker: str\n",
" text: str = \"\"\n",
"\n",
"\n",
"class Diarization(RootModel):\n",
" root: List[Diary]\n",
"\n",
" @property\n",
" def speakers(self) -> List[str]:\n",
" return sorted(list(set([diary.speaker for diary in self.root])))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Defines the `Diarizer` class for speaker diarization and transcription.\n",
"\n",
"- **Initialization (`__init__`)**: Loads pretrained models for diarization and transcription, setting up processing pipelines.\n",
"- **Preprocessing (`preprocess`)**: Converts audio files into a format suitable for diarization and transcription.\n",
"- **Segment Merging (`prepare_segments`)**: Combines consecutive segments from the same speaker into a single segment.\n",
"- **Transcription (`transcribe_segments`)**: Uses Whisper to transcribe diarized segments in batches.\n",
"- **Full Pipeline (`diarize_and_transcribe`)**: Runs diarization and transcription on an audio file, returning structured speaker-labeled transcripts."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class Diarizer:\n",
" def __init__(\n",
" self,\n",
" diarizer_model: str = \"pyannote/speaker-diarization-3.1\",\n",
" transcription_model: str = \"openai/whisper-medium\",\n",
" ):\n",
" self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
"\n",
" # Diarization\n",
" self.diarization_pipeline = Pipeline.from_pretrained(diarizer_model, use_auth_token=HF_TOKEN).to(self.device)\n",
"\n",
" # Transcription\n",
" processor = AutoProcessor.from_pretrained(transcription_model)\n",
" self.sampling_rate = processor.feature_extractor.sampling_rate\n",
"\n",
" self.whisper_pipeline = pipeline(\n",
" \"automatic-speech-recognition\",\n",
" model=transcription_model,\n",
" chunk_length_s=30,\n",
" )\n",
"\n",
" def preprocess(self, inputs):\n",
" with open(inputs, \"rb\") as f:\n",
" inputs = f.read()\n",
"\n",
" if isinstance(inputs, bytes):\n",
" inputs = ffmpeg_read(inputs, self.sampling_rate).copy()\n",
"\n",
" if len(inputs.shape) != 1:\n",
" print(\"We expect a single channel audio input for ASRDiarizePipeline so we downmix\")\n",
" inputs = np.mean(inputs, axis=0, keepdims=True)\n",
"\n",
" torch_batch_input = torch.from_numpy(inputs).to(torch.float32)[None]\n",
" return inputs, {\"waveform\": torch_batch_input, \"sample_rate\": self.sampling_rate}\n",
"\n",
" @staticmethod\n",
" def prepare_segments(diarization: Annotation) -> Diarization:\n",
" \"\"\"\n",
" Diarizer output may contain consecutive segments from the same speaker (e.g. {(0 -> 1, speaker_1), (1 -> 1.5, speaker_1), ...})\n",
" we combine these segments to give overall timestamps for each speaker's turn (e.g. {(0 -> 1.5, speaker_1), ...})\n",
" \"\"\"\n",
"\n",
" segments = []\n",
" for segment, track, label in diarization.itertracks(yield_label=True):\n",
" segments.append({\"segment\": {\"start\": segment.start, \"end\": segment.end}, \"label\": label})\n",
"\n",
" new_segments = []\n",
" prev_segment = cur_segment = segments[0]\n",
"\n",
" for i in range(1, len(segments)):\n",
" cur_segment = segments[i]\n",
" if cur_segment[\"label\"] != prev_segment[\"label\"] and i < len(segments):\n",
" new_segments.append(\n",
" {\n",
" \"segment\": {\"start\": prev_segment[\"segment\"][\"start\"], \"end\": cur_segment[\"segment\"][\"start\"]},\n",
" \"speaker\": prev_segment[\"label\"],\n",
" }\n",
" )\n",
" prev_segment = segments[i]\n",
"\n",
" new_segments.append(\n",
" {\n",
" \"segment\": {\"start\": prev_segment[\"segment\"][\"start\"], \"end\": cur_segment[\"segment\"][\"end\"]},\n",
" \"speaker\": prev_segment[\"label\"],\n",
" }\n",
" )\n",
" return Diarization.model_validate(new_segments)\n",
"\n",
" def transcribe_segments(self, diaries: Diarization, inputs: np.ndarray, batch_size: int = 10):\n",
" batch = []\n",
" start_index = 0\n",
" for index, diary in enumerate(diaries.root):\n",
" audio_segment_start = int(self.sampling_rate * diary.segment.start)\n",
" audio_segment_end = int(self.sampling_rate * diary.segment.end)\n",
" segment_audio = inputs[audio_segment_start:audio_segment_end]\n",
" if len(batch) < batch_size:\n",
" batch.append(segment_audio)\n",
" continue\n",
"\n",
" predicted = self.whisper_pipeline(batch)\n",
" for pred, segment in zip(predicted, diaries.root[start_index : start_index + len(batch)]):\n",
" segment.text = pred.get(\"text\", \"\")\n",
"\n",
" batch = []\n",
" start_index = index + 1\n",
"\n",
" if batch:\n",
" predicted = self.whisper_pipeline(batch)\n",
" for pred, segment in zip(predicted, diaries.root[start_index : start_index + len(batch)]):\n",
" segment.text = pred.get(\"text\", \"\")\n",
"\n",
" return diaries\n",
"\n",
" def diarize_and_transcribe(self, audio_file: Path | str):\n",
" \"\"\"\n",
" Algo:\n",
" 1. Diarize => Sections of the audiofile with a \"speaker stamp\"\n",
" 2. For each section: Transcribe\n",
" \"\"\"\n",
" # apply the pipeline to an audio file\n",
" audio_file = audio_file if isinstance(audio_file, str) else audio_file.as_posix()\n",
" transcription_input, diarization_input = self.preprocess(audio_file)\n",
" diarization = self.diarization_pipeline(diarization_input)\n",
" diary = self.prepare_segments(diarization)\n",
" diary = self.transcribe_segments(diary, transcription_input)\n",
" return diary\n",
"\n",
"\n",
"diarizer = Diarizer()\n",
"# diary = diarizer.diarize_and_transcribe(\"test_227.wav\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Encord Authentication\n",
"\n",
"Encord uses ssh-keys for authentication. The following is a code cell for setting the `ENCORD_SSH_KEY` environment variable. It contains the raw content of your private ssh key file.\n",
"\n",
"If you have not setup an ssh key, see our [documentation](https://agents-docs.encord.com/authentication/).\n",
"\n",
"> 💡 In colab, you can set the key once in the secrets in the left sidebar and load it in new notebooks. IF YOU ARE NOT RUNNING THE CODE IN THE COLLAB NOTEBOOK, you must set the environment variable directly.\n",
"> ```python\n",
"> os.environ[\"ENCORD_SSH_KEY\"] = \"\"\"paste-private-key-here\"\"\"\n",
"> ```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from google.colab import userdata\n",
"\n",
"os.environ[\"ENCORD_SSH_KEY\"] = userdata.get(\"ENCORD_SSH_KEY\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Imports and Initialization and Variable Names\n",
"\n",
"1. The Runner is initialized with a project hash, which allows interaction with an Encord project. Ensure that you replace with the ID of your Encord Project.\n",
"\n",
"2. The code filters objects from the project's ontology structure based on two criteria:\n",
" - The object must be of type Shape.AUDIO (audio object).\n",
" - The object's title must contain the word \"speaker\" (SPEAKER_INDICATOR) (case-insensitive).\n",
" - The object should have a nested _text_ classification named `\"utterance #transcript\"` in which the transcript will be stored.\n",
"\n",
"3. The objects that meet the filtering criteria are stored.\n",
"\n",
"⚠️ **NOTE:** Change these variables for your project!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"UTTERANCE_NAME = \"utterance #transcript\"\n",
"PROJECT_HASH = \"\"\n",
"HIGH_CONFIDENCE_PATHWAY_NAME = \"high confidence\"\n",
"LOW_CONFIDENCE_PATHWAY_NAME = \"low confidence\"\n",
"SPEAKER_INDICATOR = \"speaker\"\n",
"DIARIZATION_WORKFLOW_STAGE_NAME = \"Diarization\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Ontology example\n",
"Note: the #transcript suffix is not rendered here but allows the editor to 'bind' onto this attribute and render the transcript. This is discussed [here](https://docs.encord.com/platform-documentation/Annotate/annotate-ontologies/annotate-ontologies#transcript-attributes)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Pre-execution Validation\n",
"\n",
"To ensure that your project is of an appropriate form to run the above agent, we can perform pre-execution checks that the relevant Workflow and Ontology are in place."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import Annotated\n",
"\n",
"from encord.objects import Object, Shape\n",
"from encord.objects.attributes import TextAttribute\n",
"from encord.objects.coordinates import AudioCoordinates\n",
"from encord.objects.ontology_labels_impl import LabelRowV2\n",
"from encord.workflow.stages.agent import AgentStage\n",
"\n",
"from encord_agents.tasks import Depends, Runner\n",
"from encord_agents.tasks.dependencies import dep_asset\n",
"\n",
"\n",
"def pre_execution_validation(runner: Runner) -> None:\n",
" project = runner.project\n",
" assert runner.project\n",
"\n",
" diarization_stage = project.workflow.get_stage(name=DIARIZATION_WORKFLOW_STAGE_NAME, type_=AgentStage)\n",
" assert diarization_stage.pathways\n",
" assert {HIGH_CONFIDENCE_PATHWAY_NAME, LOW_CONFIDENCE_PATHWAY_NAME}.issubset(\n",
" {pathway.name for pathway in diarization_stage.pathways}\n",
" )\n",
"\n",
" assert any(object.shape == Shape.AUDIO for object in project.ontology_structure.objects)\n",
" audio_objects = [\n",
" object\n",
" for object in project.ontology_structure.objects\n",
" if object.shape == Shape.AUDIO and object.title == SPEAKER_INDICATOR\n",
" ]\n",
" if len(audio_objects) > 1:\n",
" print(f\"There are multiple Audio {SPEAKER_INDICATOR=} objects\")\n",
"\n",
" def is_acceptable_audio(object: Object) -> bool:\n",
" try:\n",
" object.get_child_by_title(UTTERANCE_NAME, type_=TextAttribute)\n",
" return True\n",
" except Exception:\n",
" return False\n",
"\n",
" assert any(is_acceptable_audio(audio_obj) for audio_obj in audio_objects)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runner = Runner(project_hash=PROJECT_HASH, pre_execution_callback=pre_execution_validation)\n",
"speakers = [\n",
" o\n",
" for o in runner.project.ontology_structure.objects\n",
" if o.shape == Shape.AUDIO and SPEAKER_INDICATOR in o.title.lower()\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define Functions\n",
"\n",
"The following function creates the audio transcription annotations on the selected objects. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def annotate_transcription(diaries: Diarization, label_row: LabelRowV2) -> bool:\n",
" speaker_lookup = dict(zip(diaries.speakers, speakers))\n",
" added_any = False\n",
"\n",
" for diary in diaries.root:\n",
" speaker_clf = speaker_lookup.get(diary.speaker)\n",
" if speaker_clf is None:\n",
" continue\n",
" utterance_attr = speaker_clf.get_child_by_title(UTTERANCE_NAME, type_=TextAttribute)\n",
"\n",
" ins = speaker_clf.create_instance()\n",
" ins.set_answer(diary.text, attribute=utterance_attr)\n",
" ins.set_for_frames(coordinates=AudioCoordinates(range=[diary.segment.encord_range]))\n",
" label_row.add_object_instance(ins)\n",
" added_any = True\n",
" return added_any"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the diarization function for speaker identification and transcription, and call the `annotate_transcription` function defined above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@runner.stage(DIARIZATION_WORKFLOW_STAGE_NAME)\n",
"def do_diarization(label_row: LabelRowV2, asset: Annotated[Path, Depends(dep_asset)]):\n",
" diaries = diarizer.diarize_and_transcribe(asset)\n",
" if annotate_transcription(diaries, label_row):\n",
" label_row.save()\n",
" return HIGH_CONFIDENCE_PATHWAY_NAME\n",
" else:\n",
" return LOW_CONFIDENCE_PATHWAY_NAME"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the Agent\n",
"\n",
"Initialize the runner and set the task_batch_size to 1.\n",
"\n",
"We encourage you first to try it out with `max_tasks_per_stage=1` to first check that the Agent is working appropriately for your use-case.\n",
"\n",
"> 💡*Hint:* If you want to execute this as a Python script, you can run it as a command line interface by putting the above code in an `agents.py` file and replacing\n",
"> ```python\n",
"> runner(...)\n",
"> ```\n",
"> with\n",
"> ```python\n",
"> if __name__ == \"__main__\":\n",
"> runner.run()\n",
"> ```\n",
"> Which allows you to set, for example the Project hash using the command line:\n",
"> ```bash\n",
"> python agent.py --project-hash \"...\"\n",
"> ```\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runner(task_batch_size=1, max_tasks_per_stage=None)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Outcome\n",
"\n",
"You have created an AI-powered agent to transcribe and diarize audio files. The agent has converts speech to text while distinguishing between different speakers, providing a clear, structured transcript that answers the question: *\"Who spoke when?\"* This automated process simplifies conversation analysis and enhances understanding of recorded audio."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next Steps\n",
"\n",
"You can do [sentiment analysis](../audio_transcription_agent_multi_speaker/) on your transcriptions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What to do in Case of Installation Errors\n",
"\n",
"If the following error occurs during installation:\n",
"\n",
"\n",
"\n",
"Try running the code cell below before installing again.\n",
"It typically happens at the later installs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import locale\n",
"\n",
"\n",
"def getpreferredencoding(do_setlocale=True):\n",
" return \"UTF-8\"\n",
"\n",
"\n",
"locale.getpreferredencoding = getpreferredencoding"
]
}
],
"metadata": {
"accelerator": "GPU",
"colab": {
"gpuType": "T4",
"provenance": []
},
"kernelspec": {
"display_name": "encord-agents-tO19NJQ2-py3.11",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}