diff --git a/extractors/captions.py b/extractors/captions.py index d05ec4e..0dd1a4e 100644 --- a/extractors/captions.py +++ b/extractors/captions.py @@ -16,12 +16,12 @@ def extract_captions_from_dict(captions, *, lang=None, label=None): return captions url = next(caption["second__remoteUrl"] for caption in captions["captions"] if caption["languageCode"] == lang or caption["label"] == label) - with requests.get(url) as r: - r.raise_for_status() - # remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions - if (lang and "auto-generated" in lang) or (label and "auto-generated" in label): - return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE) - return r + r = requests.get(url) + r.raise_for_status() + # remove extraneous " align:start position:0%" on timestamps lines on auto-generated captions + if (lang and "auto-generated" in lang) or (label and "auto-generated" in label): + return re.sub(r"^([0-9:.]+ --> [0-9:.]+).*$", r"\1", r.content.decode("utf8"), flags=re.MULTILINE) + return r def extract_captions_from_video(id): return { diff --git a/extractors/channel.py b/extractors/channel.py index 39475d9..1cf35c7 100644 --- a/extractors/channel.py +++ b/extractors/channel.py @@ -18,142 +18,142 @@ def extract_channel(ucid): return channel_cache[ucid] channel_type = "channel" if len(ucid) == 24 and ucid[:2] == "UC" else "user" - with requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie()) as r: - r.raise_for_status() - yt_initial_data = extract_yt_initial_data(r.content.decode("utf8")) + r = requests.get("https://www.youtube.com/{}/{}/videos?hl=en".format(channel_type, ucid), cookies=eu_consent_cookie()) + r.raise_for_status() + yt_initial_data = extract_yt_initial_data(r.content.decode("utf8")) - for alert in yt_initial_data.get("alerts", []): - alert_text = combine_runs(alert["alertRenderer"]["text"]) - if alert_text == "This channel does not exist.": - return { - "error": alert_text, - "identifier": "NOT_FOUND" - } - elif alert_text.startswith("This account has been terminated"): - return { - "error": alert_text, - "identifier": "ACCOUNT_TERMINATED" - } - else: - print("Seen alert text '{}'".format(alert_text)) - - header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {} - channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"] - - if header: - author = header["title"] - author_id = header["channelId"] - author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"] + for alert in yt_initial_data.get("alerts", []): + alert_text = combine_runs(alert["alertRenderer"]["text"]) + if alert_text == "This channel does not exist.": + return { + "error": alert_text, + "identifier": "NOT_FOUND" + } + elif alert_text.startswith("This account has been terminated"): + return { + "error": alert_text, + "identifier": "ACCOUNT_TERMINATED" + } else: - author = channel_metadata["title"] - author_id = channel_metadata["externalId"] - author_url = channel_metadata["channelUrl"] + print("Seen alert text '{}'".format(alert_text)) - subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers" - description = channel_metadata["description"] - allowed_regions = channel_metadata["availableCountryCodes"] + header = yt_initial_data["header"]["c4TabbedHeaderRenderer"] if "c4TabbedHeaderRenderer" in yt_initial_data["header"] else {} + channel_metadata = yt_initial_data["metadata"]["channelMetadataRenderer"] - author_banners = [] - if "banner" in header: - author_banners = header["banner"]["thumbnails"] - for t in author_banners: - t["url"] = normalise_url_protocol(t["url"]) + if header: + author = header["title"] + author_id = header["channelId"] + author_url = header["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"] + else: + author = channel_metadata["title"] + author_id = channel_metadata["externalId"] + author_url = channel_metadata["channelUrl"] - author_thumbnails = [] - avatar = header.get("avatar") or channel_metadata.get("avatar") - if avatar: - author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"]) + subscriber_count = combine_runs(header["subscriberCountText"]) if "subscriberCountText" in header else "Unknown subscribers" + description = channel_metadata["description"] + allowed_regions = channel_metadata["availableCountryCodes"] - latest_videos = [] - tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"] - try: - videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos") - tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0] - except StopIteration: - tab_parts = {} + author_banners = [] + if "banner" in header: + author_banners = header["banner"]["thumbnails"] + for t in author_banners: + t["url"] = normalise_url_protocol(t["url"]) - # check that the channel actually has videos - this may be replaced - # with messageRenderer.text.simpleText == "This channel has no videos." - if "gridRenderer" in tab_parts: - videos = ( - v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v - ) - for v in videos: - live = False - is_upcoming = False - length_text = "UNKNOWN" - length_seconds = -1 - for o in v["thumbnailOverlays"]: - if "thumbnailOverlayTimeStatusRenderer" in o: - length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) - length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] - if length_text_style == "DEFAULT": - length_seconds = length_text_to_seconds(length_text) - elif length_text_style == "LIVE": - live = True - elif length_text_style == "UPCOMING": - is_upcoming = True - published = 0 - published_text = "Live now" - premiere_timestamp = None - if "publishedTimeText" in v: - published_text = v["publishedTimeText"]["simpleText"] - published = past_text_to_time(published_text) - if "upcomingEventData" in v: - premiere_timestamp = v["upcomingEventData"]["startTime"] - published_text = time_to_past_text(int(premiere_timestamp)) + author_thumbnails = [] + avatar = header.get("avatar") or channel_metadata.get("avatar") + if avatar: + author_thumbnails = generate_full_author_thumbnails(avatar["thumbnails"]) - view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None - view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None + latest_videos = [] + tabs = yt_initial_data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"] + try: + videos_tab = next(tab["tabRenderer"] for tab in tabs if tab["tabRenderer"]["title"] == "Videos") + tab_parts = videos_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0] + except StopIteration: + tab_parts = {} - latest_videos.append({ - "type": "video", - "title": combine_runs(v["title"]), - "videoId": v["videoId"], - "author": author, - "authorId": author_id, - "authorUrl": author_url, - "videoThumbnails": generate_video_thumbnails(v["videoId"]), - "description": "", - "descriptionHtml": "", - "viewCount": view_count_text_to_number(view_count_text), - "second__viewCountText": view_count_text, - "second__viewCountTextShort": view_count_text_short, - "published": published, - "publishedText": published_text, - "lengthSeconds": length_seconds, - "second__lengthText": length_text, - "liveNow": live, - "paid": None, - "premium": None, - "isUpcoming": is_upcoming, - "premiereTimestamp": premiere_timestamp - }) + # check that the channel actually has videos - this may be replaced + # with messageRenderer.text.simpleText == "This channel has no videos." + if "gridRenderer" in tab_parts: + videos = ( + v["gridVideoRenderer"] for v in tab_parts["gridRenderer"]["items"] if "gridVideoRenderer" in v + ) + for v in videos: + live = False + is_upcoming = False + length_text = "UNKNOWN" + length_seconds = -1 + for o in v["thumbnailOverlays"]: + if "thumbnailOverlayTimeStatusRenderer" in o: + length_text = combine_runs(o["thumbnailOverlayTimeStatusRenderer"]["text"]) + length_text_style = o["thumbnailOverlayTimeStatusRenderer"]["style"] + if length_text_style == "DEFAULT": + length_seconds = length_text_to_seconds(length_text) + elif length_text_style == "LIVE": + live = True + elif length_text_style == "UPCOMING": + is_upcoming = True + published = 0 + published_text = "Live now" + premiere_timestamp = None + if "publishedTimeText" in v: + published_text = v["publishedTimeText"]["simpleText"] + published = past_text_to_time(published_text) + if "upcomingEventData" in v: + premiere_timestamp = v["upcomingEventData"]["startTime"] + published_text = time_to_past_text(int(premiere_timestamp)) - channel = { - "author": author, - "authorId": author_id, - "authorUrl": author_url, - "authorBanners": author_banners, - "authorThumbnails": author_thumbnails, - "subCount": uncompress_counter(subscriber_count.split(" ")[0]), - "second__subCountText": subscriber_count, - "totalViews": None, - "joined": None, - "paid": None, - "autoGenerated": None, - "isFamilyFriendly": None, - "description": description, - "descriptionHtml": add_html_links(escape_html_textcontent(description)), - "allowedRegions": allowed_regions, - "latestVideos": latest_videos, - "relatedChannels": [] - } + view_count_text = combine_runs(v["viewCountText"]) if "viewCountText" in v else None + view_count_text_short = combine_runs(v["shortViewCountText"]) if "shortViewCountText" in v else None - with channel_cache_lock: - channel_cache[ucid] = channel + latest_videos.append({ + "type": "video", + "title": combine_runs(v["title"]), + "videoId": v["videoId"], + "author": author, + "authorId": author_id, + "authorUrl": author_url, + "videoThumbnails": generate_video_thumbnails(v["videoId"]), + "description": "", + "descriptionHtml": "", + "viewCount": view_count_text_to_number(view_count_text), + "second__viewCountText": view_count_text, + "second__viewCountTextShort": view_count_text_short, + "published": published, + "publishedText": published_text, + "lengthSeconds": length_seconds, + "second__lengthText": length_text, + "liveNow": live, + "paid": None, + "premium": None, + "isUpcoming": is_upcoming, + "premiereTimestamp": premiere_timestamp + }) - return channel + channel = { + "author": author, + "authorId": author_id, + "authorUrl": author_url, + "authorBanners": author_banners, + "authorThumbnails": author_thumbnails, + "subCount": uncompress_counter(subscriber_count.split(" ")[0]), + "second__subCountText": subscriber_count, + "totalViews": None, + "joined": None, + "paid": None, + "autoGenerated": None, + "isFamilyFriendly": None, + "description": description, + "descriptionHtml": add_html_links(escape_html_textcontent(description)), + "allowedRegions": allowed_regions, + "latestVideos": latest_videos, + "relatedChannels": [] + } + + with channel_cache_lock: + channel_cache[ucid] = channel + + return channel def extract_channel_videos(ucid): channel = extract_channel(ucid) @@ -167,59 +167,59 @@ def extract_channel_latest(ucid): if ucid in channel_latest_cache: return channel_latest_cache[ucid] - with requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) as r: - if r.status_code == 404: - cherrypy.response.status = 404 - return { - "error": "This channel does not exist.", - "identifier": "NOT_FOUND" - } + r = requests.get("https://www.youtube.com/feeds/videos.xml?channel_id={}".format(ucid)) + if r.status_code == 404: + cherrypy.response.status = 404 + return { + "error": "This channel does not exist.", + "identifier": "NOT_FOUND" + } - feed = ET.fromstring(r.content) - author_container = feed.find("{http://www.w3.org/2005/Atom}author") - author = author_container.find("{http://www.w3.org/2005/Atom}name").text - author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text - channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text - results = [] - missing_published = False - for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"): - id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text - media_group = entry.find("{http://search.yahoo.com/mrss/}group") - description = media_group.find("{http://search.yahoo.com/mrss/}description").text or "" - media_community = media_group.find("{http://search.yahoo.com/mrss/}community") - published_entry = entry.find("{http://www.w3.org/2005/Atom}published") - if published_entry is not None: # sometimes youtube does not provide published dates, no idea why. - published = int(dateutil.parser.isoparse(published_entry.text).timestamp()) - results.append({ - "type": "video", - "title": entry.find("{http://www.w3.org/2005/Atom}title").text, - "videoId": id, - "author": author, - "authorId": channel_id, - "authorUrl": author_url, - "videoThumbnails": generate_video_thumbnails(id), - "description": description, - "descriptionHtml": add_html_links(escape_html_textcontent(description)), - "viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]), - "published": published, - "publishedText": time_to_past_text(published), - "lengthSeconds": None, - "liveNow": None, - "paid": None, - "premium": None, - "isUpcoming": None - }) - else: - missing_published = True + feed = ET.fromstring(r.content) + author_container = feed.find("{http://www.w3.org/2005/Atom}author") + author = author_container.find("{http://www.w3.org/2005/Atom}name").text + author_url = author_container.find("{http://www.w3.org/2005/Atom}uri").text + channel_id = feed.find("{http://www.youtube.com/xml/schemas/2015}channelId").text + results = [] + missing_published = False + for entry in feed.findall("{http://www.w3.org/2005/Atom}entry"): + id = entry.find("{http://www.youtube.com/xml/schemas/2015}videoId").text + media_group = entry.find("{http://search.yahoo.com/mrss/}group") + description = media_group.find("{http://search.yahoo.com/mrss/}description").text or "" + media_community = media_group.find("{http://search.yahoo.com/mrss/}community") + published_entry = entry.find("{http://www.w3.org/2005/Atom}published") + if published_entry is not None: # sometimes youtube does not provide published dates, no idea why. + published = int(dateutil.parser.isoparse(published_entry.text).timestamp()) + results.append({ + "type": "video", + "title": entry.find("{http://www.w3.org/2005/Atom}title").text, + "videoId": id, + "author": author, + "authorId": channel_id, + "authorUrl": author_url, + "videoThumbnails": generate_video_thumbnails(id), + "description": description, + "descriptionHtml": add_html_links(escape_html_textcontent(description)), + "viewCount": int(media_community.find("{http://search.yahoo.com/mrss/}statistics").attrib["views"]), + "published": published, + "publishedText": time_to_past_text(published), + "lengthSeconds": None, + "liveNow": None, + "paid": None, + "premium": None, + "isUpcoming": None + }) + else: + missing_published = True - if len(results) == 0 and missing_published: # no results due to all missing published - cherrypy.response.status = 503 - return { - "error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.", - "identifier": "PUBLISHED_DATES_NOT_PROVIDED" - } + if len(results) == 0 and missing_published: # no results due to all missing published + cherrypy.response.status = 503 + return { + "error": "YouTube did not provide published dates for any feed items. This is usually temporary - refresh in a few minutes.", + "identifier": "PUBLISHED_DATES_NOT_PROVIDED" + } - with channel_latest_cache_lock: - channel_latest_cache[ucid] = results + with channel_latest_cache_lock: + channel_latest_cache[ucid] = results - return results + return results diff --git a/extractors/manifest.py b/extractors/manifest.py index 2a61ef0..9a0923f 100644 --- a/extractors/manifest.py +++ b/extractors/manifest.py @@ -11,9 +11,9 @@ def extract_manifest(id): return video if video["second__providedDashUrl"]: - with requests.get(video["second__providedDashUrl"]) as r: - r.raise_for_status() - return r + r = requests.get(video["second__providedDashUrl"]) + r.raise_for_status() + return r adaptation_sets_dict = {} for f in video["adaptiveFormats"]: diff --git a/extractors/search.py b/extractors/search.py index e100b6d..3523a18 100644 --- a/extractors/search.py +++ b/extractors/search.py @@ -17,51 +17,51 @@ ytdl = yt_dlp.YoutubeDL(ytdl_opts) def extract_search(q): try: - with requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie()) as r: - r.raise_for_status() - content = r.content.decode("utf8") - yt_initial_data = extract_yt_initial_data(content) + r = requests.get("https://www.youtube.com/results", params={"q": q, "hl": "en"}, cookies=eu_consent_cookie()) + r.raise_for_status() + content = r.content.decode("utf8") + yt_initial_data = extract_yt_initial_data(content) - sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"] - # youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later - itemSections = [s for s in sections if "itemSectionRenderer" in s] + sections = yt_initial_data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"] + # youtube searches contain a lot of random stuff, just grab it all for now, then filter to `videoRenderer` later + itemSections = [s for s in sections if "itemSectionRenderer" in s] - items = [] - for section in itemSections: - items += section["itemSectionRenderer"]["contents"] + items = [] + for section in itemSections: + items += section["itemSectionRenderer"]["contents"] - results = [] - for item in items: - if "videoRenderer" in item: - video = item["videoRenderer"] - published = 0 - published_text = "Live now" - if "publishedTimeText" in video: - published_text = video["publishedTimeText"]["simpleText"] - published = past_text_to_time(published_text) - results.append({ - "type": "video", - "title": combine_runs(video["title"]), - "videoId": video["videoId"], - "author": combine_runs(video["longBylineText"]), - "authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], - "authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], - "videoThumbnails": generate_video_thumbnails(video["videoId"]), - "description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", - "descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", - "viewCount": get_view_count_or_recommended(video), - "second__viewCountText": get_view_count_text_or_recommended(video), - "published": published, - "publishedText": published_text, - "lengthSeconds": get_length_or_live_now(video), - "second__lengthText": get_length_text_or_live_now(video), - "liveNow": is_live(video), - "paid": None, - "premium": None, - "isUpcoming": None - }) - search_cache[q] = results # only cache full extraction - return results + results = [] + for item in items: + if "videoRenderer" in item: + video = item["videoRenderer"] + published = 0 + published_text = "Live now" + if "publishedTimeText" in video: + published_text = video["publishedTimeText"]["simpleText"] + published = past_text_to_time(published_text) + results.append({ + "type": "video", + "title": combine_runs(video["title"]), + "videoId": video["videoId"], + "author": combine_runs(video["longBylineText"]), + "authorId": video["longBylineText"]["runs"][0]["navigationEndpoint"]["browseEndpoint"]["browseId"], + "authorUrl": video["longBylineText"]["runs"][0]["navigationEndpoint"]["commandMetadata"]["webCommandMetadata"]["url"], + "videoThumbnails": generate_video_thumbnails(video["videoId"]), + "description": combine_runs(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", + "descriptionHtml": combine_runs_html(video["descriptionSnippet"]) if "descriptionSnippet" in video else "", + "viewCount": get_view_count_or_recommended(video), + "second__viewCountText": get_view_count_text_or_recommended(video), + "published": published, + "publishedText": published_text, + "lengthSeconds": get_length_or_live_now(video), + "second__lengthText": get_length_text_or_live_now(video), + "liveNow": is_live(video), + "paid": None, + "premium": None, + "isUpcoming": None + }) + search_cache[q] = results # only cache full extraction + return results except Exception: print("messed up extracting search, using youtube-dl instead") diff --git a/extractors/suggestions.py b/extractors/suggestions.py index fb291e0..f60ad87 100644 --- a/extractors/suggestions.py +++ b/extractors/suggestions.py @@ -20,12 +20,12 @@ def extract_search_suggestions(q): "xhr": "t", # "xssi": "t" } - with requests.get("https://clients1.google.com/complete/search", params=params) as r: - r.raise_for_status() - response = r.json() - result = { - "query": q, - "suggestions": [s[0] for s in response[1]] - } - suggestions_cache[q] = result - return result + r = requests.get("https://clients1.google.com/complete/search", params=params) + r.raise_for_status() + response = r.json() + result = { + "query": q, + "suggestions": [s[0] for s in response[1]] + } + suggestions_cache[q] = result + return result diff --git a/index.py b/index.py index 91cdcfd..21c9444 100644 --- a/index.py +++ b/index.py @@ -123,17 +123,17 @@ class NewLeaf(object): @cherrypy.expose def vi(self, id, file): - with requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True) as r: - r.raise_for_status() - cherrypy.response.headers["content-type"] = r.headers["content-type"] - return next(r.iter_content(chunk_size=None)) + r = requests.get("https://i.ytimg.com/vi/{}/{}".format(id, file), stream=True) + r.raise_for_status() + cherrypy.response.headers["content-type"] = r.headers["content-type"] + return next(r.iter_content(chunk_size=None)) @cherrypy.expose def ggpht(self, *path): - with requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True) as r: - r.raise_for_status() - cherrypy.response.headers["content-type"] = r.headers["content-type"] - return next(r.iter_content(chunk_size=None)) + r = requests.get("https://yt3.ggpht.com/{}".format("/".join(path)), stream=True) + r.raise_for_status() + cherrypy.response.headers["content-type"] = r.headers["content-type"] + return next(r.iter_content(chunk_size=None)) bind_port = getattr(configuration, "bind_port", 3000) bind_host = getattr(configuration, "bind_host", "0.0.0.0")