"""Code to download the transcripts from YouTube."""
import pathlib
import json
from typing import Dict, List, Union, Tuple
import streamlit as st
import pytube
from youtube_transcript_api import YouTubeTranscriptApi
def _get_playlist_info(url: str) -> Dict[str, str]:
"""Gets the video IDs and titles from a YouTube playlist.
Args:
url (str): The URL of the YouTube playlist.
Returns:
Dict[str, str]: A dictionary with the video titles as keys and the
video IDs as values.
"""
playlist = pytube.Playlist(url)
# Dict to hold title-ID pairs
video_dict = {}
for video in playlist.videos:
video_dict[video.title] = video.video_id
return video_dict
[docs]def download_transcript(video_title: str,
video_id: str,
output_path: pathlib.Path,
verbose: bool = True) -> None:
"""Downloads the transcript of a YouTube video.
Args:
video_title (str): The title of the YouTube video.
video_id (str): The ID of the YouTube video.
output_path (pathlib.Path): The path to the output file.
verbose (bool): Whether to print the progress of the download.
Raises:
Exception: If the transcript cannot be downloaded.
"""
try:
# Download transcript with youtube_transcript_api
transcript = YouTubeTranscriptApi.get_transcript(
video_id, languages=['en', 'en-US'])
# Save transcript to a JSON file
with open(output_path, 'w', encoding='utf-8') as file:
# Put the title and the video ID at the top of the JSON file and
# then dump the transcript
json.dump({
'title': video_title,
'video_id': video_id,
'transcript': transcript,
}, file, ensure_ascii=False, indent=4)
except Exception as error_msg:
if verbose:
st.warning(f'Could not download transcript for video '
f'{video_title}.\nError message:\n{error_msg}')
[docs]def download_playlist(url: str,
data_path: pathlib.Path,
use_st_progress_bar: bool = False) -> None:
"""Downloads the transcripts of a YouTube playlist.
Args:
url (str): The URL of the YouTube playlist.
data_path (pathlib.Path): The path to the data directory.
use_st_progress_bar (bool): Whether to use a Streamlit progress bar.
"""
video_id_dict = _get_playlist_info(url)
total_videos = len(video_id_dict)
progress_bar = None
if use_st_progress_bar:
progress_bar = st.progress(0)
for i, (video_title, video_id) in enumerate(video_id_dict.items()):
if progress_bar is not None:
progress_bar.progress((i + 1) / total_videos,
f'Downloading video {i + 1} of '
f'{total_videos}')
output_file = data_path / f'Video_{str(i + 1)}.json'
download_transcript(video_title,
video_id,
output_file,
verbose=use_st_progress_bar)
def _replace_newlines(json_file: dict) -> None:
"""Replaces \n with a space
Args:
json_file (dict): The JSON file.
"""
for segment in json_file['transcript']:
segment['text'] = segment['text'].replace('\n', ' ')
def _get_chunk_indices(segment_lengths: List[int],
max_chunk_size: int,
min_overlap_size: int) -> List[Tuple[int, int]]:
"""Gets the indices of the chunks.
Args:
segment_lengths (List[int]): The lengths of the segments.
max_chunk_size (int): The maximum size of a chunk.
min_overlap_size (int): The minimum size of the overlap between two
chunks.
Returns:
List[Tuple[int, int]]: A list of tuples with the beginning and ending
indices of the chunks.
"""
# Split the transcript into chunks
chunks_indices = []
current_beginning_index = 0
current_ending_index = 0
current_chunk_size = 0
for current_index, segment_length in enumerate(segment_lengths):
if current_chunk_size + segment_length + 1 < max_chunk_size:
current_chunk_size += segment_length + 1
current_ending_index = current_index
continue
chunks_indices.append(
(current_beginning_index, current_ending_index))
# Calculate the overlap
# current_chunk_size += segment_lengths[current_beginning_index] + 1
segment_len = segment_lengths[current_beginning_index] + 1
while current_chunk_size - segment_len > min_overlap_size:
current_chunk_size -= segment_len
current_beginning_index += 1
segment_len = segment_lengths[current_beginning_index] + 1
current_chunk_size += segment_length + 1
current_ending_index = current_index
# current_chunk_size += 1
chunks_indices.append((current_beginning_index, len(segment_lengths) - 1))
return chunks_indices
[docs]def create_chunked_data(file_path: pathlib.Path,
max_chunk_size: int,
min_overlap_size: int
) -> List[Dict[str, Union[str, List[str]]]]:
"""Creates chunked data from a JSON file.
Args:
file_path (str): The path to the JSON file.
max_chunk_size (int): The maximum size of a chunk.
min_overlap_size (int): The minimum size of the overlap between two
chunks.
Returns:
List[Dict[str, Union[str, List[str]]]]: A dictionary with the chunked
data.
"""
with open(file_path, 'r') as file:
json_file = json.load(file)
# Replace \n with a space
_replace_newlines(json_file)
segment_lengths = [len(json_file['transcript'][segment]['text']) for
segment in range(len(json_file['transcript']))]
# Split the transcript into chunks
chunks_indices = _get_chunk_indices(segment_lengths,
max_chunk_size,
min_overlap_size)
# Now that we have the chunk indices, we can create the chunks
# chunks = [{
# 'text': ' '.join(
# [segment['text'] for segment in
# json_file['transcript'][chunk_index[0]:chunk_index[1] + 1]]),
# 'start': json_file['transcript'][chunk_index[0]]['start'],
# 'duration': sum(
# segment['duration'] for segment in
# json_file['transcript'][chunk_index[0]:chunk_index[1] + 1]),
# 'url': json_file['url'],
# 'title': json_file['title']}
# for chunk_index in chunks_indices]
base_url = 'https://www.youtube.com/watch?v='
video = pytube.YouTube(base_url + json_file['video_id'])
thumbnail_url = video.thumbnail_url
chunks = []
for i, chunk_index in enumerate(chunks_indices):
text_list = []
duration_sum = 0
start, end = chunk_index
for segment in json_file['transcript'][start:end + 1]:
text_list.append(segment['text'])
duration_sum += segment['duration']
timestamp = str(int(json_file['transcript'][chunk_index[0]]['start']))
chunks.append({
'text': ' '.join(text_list),
'start': json_file['transcript'][chunk_index[0]]['start'],
'duration': duration_sum,
'url': base_url + json_file['video_id'] + f'&t={timestamp}s',
'title': json_file['title'],
'thumbnail': thumbnail_url,
'index': i
})
return chunks