{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Audio Transcription\n",
"\n",
"This notebook walks you through using an AI-powered Agent to transcribe and diarize audio files. The Agent automatically converts speech to text while distinguishing between different speakers\n",
"\n",
"### Example Workflow\n",
"\n",
"The following workflow illustrates how audio files can be pre-labeled. The code in this notebook is for the `Diarization` agent.\n",
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Installation\n",
"\n",
"Ensure that you install:\n",
"- The `encord-agents` library.\n",
"- `pyannote.audio`: A deep learning-based toolkit for speaker diarization, used to identify and differentiate speakers in an audio file. \n",
"- `transformers & accelerate`: Hugging Face libraries used for running and optimizing transformer models, which improve transcription accuracy and performance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!python -m pip install -q encord-agents \"pyannotate.audio\"\n",
"!python -m pip install --upgrade -q transformers accelerate"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Hugging Face Authentication\n",
"\n",
"Retrieve the Hugging Face API token for authentication when accessing models and datasets. A HF token (Hugging Face token) is an authentication key used to access Hugging Face's models, datasets, and APIs.\n",
"\n",
"> 💡 In colab, you can set the key once in the secrets in the left sidebar and load it in new notebooks. IF YOU ARE NOT RUNNING THE CODE IN THE COLLAB NOTEBOOK, you must set the environment variable directly.\n",
"> ```python\n",
"> hf_token = \"my-hf-token\"\n",
"> ```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from google.colab import userdata\n",
"\n",
"hf_token = userdata.get(\"HF_TOKEN\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Please log in to HuggingFace and accept the terms of these two models: \n",
"\n",
"- Speaker diarization\n",
"- Audio segmentation\n",
"\n",
"Otherwise, the models will resist to run below."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Defines data structures and imports necessary libraries for audio transcription and speaker diarization.\n",
"\n",
"- `Segment`: Represents a time segment in an audio file.\n",
"- `Diary`: Stores diarization results, including the speaker and transcribed text.\n",
"- `Diarization`: A structured model to hold multiple diarized segments and retrieve unique speakers.\n",
"- Imports necessary libraries for deep learning, audio processing, and transcription."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import List, Optional, Union\n",
"\n",
"import numpy as np\n",
"import requests\n",
"import torch\n",
"from encord.objects.frames import Range\n",
"from pyannote.audio import Pipeline\n",
"from pyannote.core.annotation import Annotation\n",
"from pydantic import BaseModel, RootModel\n",
"from sympy.physics.units import length\n",
"from torchaudio import functional as F\n",
"from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline\n",
"from transformers.pipelines.audio_utils import ffmpeg_read\n",
"\n",
"\n",
"class Segment(BaseModel):\n",
" start: float\n",
" end: float\n",
"\n",
" @property\n",
" def encord_range(self) -> Range:\n",
" return Range(int(0.5 + self.start * 1000), int(0.5 + self.end * 1000))\n",
"\n",
"\n",
"class Diary(BaseModel):\n",
" segment: Segment\n",
" speaker: str\n",
" text: str = \"\"\n",
"\n",
"\n",
"class Diarization(RootModel):\n",
" root: List[Diary]\n",
"\n",
" @property\n",
" def speakers(self) -> List[str]:\n",
" return sorted(list(set([diary.speaker for diary in self.root])))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Defines the `Diarizer` class for speaker diarization and transcription.\n",
"\n",
"- **Initialization (`__init__`)**: Loads pretrained models for diarization and transcription, setting up processing pipelines.\n",
"- **Preprocessing (`preprocess`)**: Converts audio files into a format suitable for diarization and transcription.\n",
"- **Segment Merging (`prepare_segments`)**: Combines consecutive segments from the same speaker into a single segment.\n",
"- **Transcription (`transcribe_segments`)**: Uses Whisper to transcribe diarized segments in batches.\n",
"- **Full Pipeline (`diarize_and_transcribe`)**: Runs diarization and transcription on an audio file, returning structured speaker-labeled transcripts."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class Diarizer:\n",
" def __init__(\n",
" self,\n",
" diarizer_model: str = \"pyannote/speaker-diarization-3.1\",\n",
" transcription_model: str = \"openai/whisper-medium\",\n",
" ):\n",
" self.device = torch.device(\"cuda:0\" if torch.cuda.is_available() else \"cpu\")\n",
"\n",
" # Diarization\n",
" self.diarization_pipeline = Pipeline.from_pretrained(diarizer_model, use_auth_token=hf_token).to(self.device)\n",
"\n",
" # Transcription\n",
" processor = AutoProcessor.from_pretrained(transcription_model)\n",
" self.sampling_rate = processor.feature_extractor.sampling_rate\n",
"\n",
" self.whisper_pipeline = pipeline(\n",
" \"automatic-speech-recognition\",\n",
" model=transcription_model,\n",
" chunk_length_s=30,\n",
" )\n",
"\n",
" def preprocess(self, inputs):\n",
" with open(inputs, \"rb\") as f:\n",
" inputs = f.read()\n",
"\n",
" if isinstance(inputs, bytes):\n",
" inputs = ffmpeg_read(inputs, self.sampling_rate).copy()\n",
"\n",
" if len(inputs.shape) != 1:\n",
" print(\"We expect a single channel audio input for ASRDiarizePipeline so we downmix\")\n",
" inputs = np.mean(inputs, axis=0, keepdims=True)\n",
"\n",
" torch_batch_input = torch.from_numpy(inputs).to(torch.float32)[None]\n",
" return inputs, {\"waveform\": torch_batch_input, \"sample_rate\": self.sampling_rate}\n",
"\n",
" @staticmethod\n",
" def prepare_segments(diarization: Annotation) -> Diarization:\n",
" \"\"\"\n",
" Diarizer output may contain consecutive segments from the same speaker (e.g. {(0 -> 1, speaker_1), (1 -> 1.5, speaker_1), ...})\n",
" we combine these segments to give overall timestamps for each speaker's turn (e.g. {(0 -> 1.5, speaker_1), ...})\n",
" \"\"\"\n",
"\n",
" segments = []\n",
" for segment, track, label in diarization.itertracks(yield_label=True):\n",
" segments.append({\"segment\": {\"start\": segment.start, \"end\": segment.end}, \"label\": label})\n",
"\n",
" new_segments = []\n",
" prev_segment = cur_segment = segments[0]\n",
"\n",
" for i in range(1, len(segments)):\n",
" cur_segment = segments[i]\n",
" if cur_segment[\"label\"] != prev_segment[\"label\"] and i < len(segments):\n",
" new_segments.append(\n",
" {\n",
" \"segment\": {\"start\": prev_segment[\"segment\"][\"start\"], \"end\": cur_segment[\"segment\"][\"start\"]},\n",
" \"speaker\": prev_segment[\"label\"],\n",
" }\n",
" )\n",
" prev_segment = segments[i]\n",
"\n",
" new_segments.append(\n",
" {\n",
" \"segment\": {\"start\": prev_segment[\"segment\"][\"start\"], \"end\": cur_segment[\"segment\"][\"end\"]},\n",
" \"speaker\": prev_segment[\"label\"],\n",
" }\n",
" )\n",
" return Diarization.model_validate(new_segments)\n",
"\n",
" def transcribe_segments(self, diaries: Diarization, inputs: np.ndarray, batch_size: int = 10):\n",
" batch = []\n",
" start_index = 0\n",
" for index, diary in enumerate(diaries.root):\n",
" audio_segment_start = int(self.sampling_rate * diary.segment.start)\n",
" audio_segment_end = int(self.sampling_rate * diary.segment.end)\n",
" segment_audio = inputs[audio_segment_start:audio_segment_end]\n",
" if len(batch) < batch_size:\n",
" batch.append(segment_audio)\n",
" continue\n",
"\n",
" predicted = self.whisper_pipeline(batch)\n",
" for pred, segment in zip(predicted, diaries.root[start_index : start_index + len(batch)]):\n",
" segment.text = pred.get(\"text\", \"\")\n",
"\n",
" batch = []\n",
" start_index = index + 1\n",
"\n",
" if batch:\n",
" predicted = self.whisper_pipeline(batch)\n",
" for pred, segment in zip(predicted, diaries.root[start_index : start_index + len(batch)]):\n",
" segment.text = pred.get(\"text\", \"\")\n",
"\n",
" return diaries\n",
"\n",
" def diarize_and_transcribe(self, audio_file: Path | str):\n",
" \"\"\"\n",
" Algo:\n",
" 1. Diarize => Sections of the audiofile with a \"speaker stamp\"\n",
" 2. For each section: Transcribe\n",
" \"\"\"\n",
" # apply the pipeline to an audio file\n",
" audio_file = audio_file if isinstance(audio_file, str) else audio_file.as_posix()\n",
" transcription_input, diarization_input = self.preprocess(audio_file)\n",
" diarization = self.diarization_pipeline(diarization_input)\n",
" diary = self.prepare_segments(diarization)\n",
" diary = self.transcribe_segments(diary, transcription_input)\n",
" return diary\n",
"\n",
"\n",
"diarizer = Diarizer()\n",
"# diary = diarizer.diarize_and_transcribe(\"test_227.wav\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Encord Authentication\n",
"\n",
"Encord uses ssh-keys for authentication. The following is a code cell for setting the `ENCORD_SSH_KEY` environment variable. It contains the raw content of your private ssh key file.\n",
"\n",
"If you have not setup an ssh key, see our [documentation](https://agents-docs.encord.com/authentication/).\n",
"\n",
"> 💡 In colab, you can set the key once in the secrets in the left sidebar and load it in new notebooks. IF YOU ARE NOT RUNNING THE CODE IN THE COLLAB NOTEBOOK, you must set the environment variable directly.\n",
"> ```python\n",
"> os.environ[\"ENCORD_SSH_KEY\"] = \"\"\"paste-private-key-here\"\"\"\n",
"> ```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from google.colab import userdata\n",
"\n",
"os.environ[\"ENCORD_SSH_KEY\"] = userdata.get(\"ENCORD_SSH_KEY\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Imports and Initialization\n",
"\n",
"1. The Runner is initialized with a project hash, which allows interaction with an Encord project. Ensure that you replace with the ID of your Encord Project.\n",
"\n",
"2. The code filters objects from the project's ontology structure based on two criteria:\n",
" - The object must be of type Shape.AUDIO (audio object).\n",
" - The object's title must contain the word \"speaker\" (case-insensitive).\n",
" - The object should have a nested _text_ classification named `\"utterance\"` in which the transcript will be stored.\n",
"\n",
"3. The objects that meet the filtering criteria are stored."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import Annotated\n",
"\n",
"from encord.objects.attributes import TextAttribute\n",
"from encord.objects.common import Shape\n",
"from encord.objects.coordinates import AudioCoordinates\n",
"from encord.objects.frames import Range\n",
"from encord.objects.ontology_labels_impl import LabelRowV2\n",
"\n",
"from encord_agents.tasks import Depends, Runner\n",
"from encord_agents.tasks.dependencies import dep_asset\n",
"\n",
"runner = Runner(project_hash=\"\")\n",
"speakers = [\n",
" o for o in runner.project.ontology_structure.objects if o.shape == Shape.AUDIO and \"speaker\" in o.title.lower()\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define functions\n",
"\n",
"The following function creates the audio transcription annotations on the selected objects. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def annotate_transcription(diaries: Diarization, label_row: LabelRowV2) -> bool:\n",
" speaker_lookup = dict(zip(diaries.speakers, speakers))\n",
" added_any = False\n",
"\n",
" for diary in diaries.root:\n",
" speaker_clf = speaker_lookup.get(diary.speaker)\n",
" utterance_attr = speaker_clf.get_child_by_title(\"utterance\", type_=TextAttribute)\n",
" if speaker_clf is None:\n",
" continue\n",
"\n",
" ins = speaker_clf.create_instance()\n",
" ins.set_answer(diary.text, attribute=utterance_attr)\n",
" ins.set_for_frames(coordinates=AudioCoordinates(range=[diary.segment.encord_range]))\n",
" label_row.add_object_instance(ins)\n",
" added_any = True\n",
" return added_any"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Define the diarization function for speaker identification and transcription, and call the `annotate_transcription` function defined above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@runner.stage(\"Diarization\")\n",
"def do_diarization(label_row: LabelRowV2, asset: Annotated[Path, Depends(dep_asset)]):\n",
" diaries = diarizer.diarize_and_transcribe(asset)\n",
" if annotate_transcription(diaries, label_row):\n",
" label_row.save()\n",
" return \"high confidence\"\n",
" else:\n",
" return \"low confidence\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the agent\n",
"\n",
"Initialize the runner and set the task_batch_size to 1.\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"runner(task_batch_size=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"> 💡*Hint:* If you execute this as a Python script, you can run it as a command line interface by putting the above code in an `agents.py` file and replacing\n",
"> ```python\n",
"> runner()\n",
"> ```\n",
"> with\n",
"> ```python\n",
"> if __name__ == \"__main__\":\n",
"> runner.run()\n",
"> ```\n",
"> Which allows you to set, for example the Project hash using the command line:\n",
"> ```bash\n",
"> python agent.py --project-hash \"...\"\n",
"> ```\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"You can do [sentiment analysis](../audio_transcription_agent_multi_speaker/) on your transcriptions."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What to do in case of installation errors\n",
"\n",
"If the following error occures during installation:\n",
"\n",
"\n",
"\n",
"Try running the code cell below before installing again.\n",
"It typically happens at the later installs."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import locale\n",
"\n",
"\n",
"def getpreferredencoding(do_setlocale=True):\n",
" return \"UTF-8\"\n",
"\n",
"\n",
"locale.getpreferredencoding = getpreferredencoding"
]
}
],
"metadata": {
"accelerator": "GPU",
"colab": {
"gpuType": "T4",
"provenance": [],
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}