Sampling YouTube Channels’ Id with Async Functions

python async web-scraping

A quick way to collect YouTube channels’ id from YouTube home page asynchronously.

Wilson Yip
2023-08-09

Introduction

When exchanging information between nodes (like making http request, or querying a database, etc.), the client side usually is required to wait for the response from the server. In synchronous programming, commands / statements are executed line by line, meaning that if a statement is required to wait for a particular amount of time, every other statements below cannot be executed because they are blocked by this long waiting statement to complete.

For example if we want to query two tables from two different databases and process them afterwords, in synchronous programming, we will need to wait for the first query to complete in order to start the second query. If the response time of the both queries are 10 seconds, we need to spend 20 seconds just for waiting. Asynchronous programming allows the processor to execute other tasks when a particular command is required to wait.

This article will first introduce the asyncio library in Python. Then we will use this method to sample channel ids from YouTube for later use.

Asyncio

Coroutines

The first thing in asynchronous programming is to create an async function. To do so in Python, we simply add the keyword async before the def keyword. Yet, you cannot simply execute an async function. When execute an async function, it always returns a coroutine object. A coroutine is an awaitable and therefore can be awaited from other coroutines. Notice that the await keyword can only be used in an async function.

Below shows the awaitability of synchronous (time.sleep) and asynchronous (foo) functions.

import time
import asyncio

async def foo1():
    time.sleep(1)   # time.sleep is not an async function and cannot be awaited
    print("Hello world!")

type(foo1())

# <stdin>:1: RuntimeWarning: coroutine 'foo1' was never awaited
# RuntimeWarning: Enable tracemalloc to get the object allocation traceback
# <class 'coroutine'>

async def foo2():
    await foo1()    # foo1 is an async function and can be awaited

To execute a coroutine object, asyncio.run is required.

asyncio.run(foo1())

# Hello world!

type(asyncio.run(foo1()))

# Hello world!
# <class 'NoneType'>

Tasks

Tasks are used to run coroutines in event loops. In other words, a coroutine is required to be wrapped into a task in order to pass to an event loop for execution. A task can be created via asyncio.create_task or loop.create_task methods. Whenever a task is created via these methods, they will execute whenever the running theread is available. Besides, if we do not await a task to be finished, the task will be dropped when the last line of the async function is executed.

import asyncio

async def foo1():
    for i in range(10):
        print(i)
        await asyncio.sleep(0.5)

async def foo2():
    await asyncio.sleep(2)
    print("Hi from foo2")

async def main():
    task1 = asyncio.create_task(foo1())
    task2 = asyncio.create_task(foo2())
    
    # The above tasks will not be executed before this line
    # Because the running thread is not available before this
    print("The thread is busy before this")  
    
    await task2    # this task will be awaited. But not task1

if __name__ == "__main__":
    asyncio.run(main())

# The thread is busy before this
# 0
# 1
# 2
# 3
# Hi from foo2

Event loops

An event loop is an object to manage a list of tasks. It identifies if the running thread is available for execute other tasks. The method asyncio.run simply acquires an event loop, convert the coroutine into a task, execute it and closing the threadpool. Below shows the same example as above but better illustrate the event loop.

import asyncio

async def foo1():
    for i in range(10):
        print(i)
        await asyncio.sleep(0.5)

async def foo2():
    await asyncio.sleep(2)
    print("Hi from foo2")

async def main():
    task1 = asyncio.create_task(foo1())
    task2 = asyncio.create_task(foo2())

    print("The thread is busy before this")
    
    await task2
    print(type(asyncio.get_event_loop()))

if __name__ == "__main__":
    loop = asyncio.get_event_loop()  # Acquire an event loop
    loop.run_until_complete(main())  # Execute the tasks
    loop.close()                     # Close the loop

Semaphore

By default, an event loop will put all the tasks to execute whenever there are resources. But sometimes we want to limit the number of concurrent tasks. In this case, asyncio.Semaphore may come in handy.

import asyncio

async def async_func(task_no, sem):
    async with sem:
        print(f'{task_no} :Hello ...')
        await asyncio.sleep(1)
        print(f'{task_no}... world!')

async def main(sem):
    tasks = [
        asyncio.create_task(async_func(x, sem)) for x in ["t1", "t2", "t3"]
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    sem = asyncio.Semaphore(2)
    result = asyncio.run(main(sem))
    loop.close()

# t1 :Hello ...
# t2 :Hello ...
# t1... world!
# t2... world!
# t3 :Hello ...
# t3... world!

Scraping YouTube asynchronously

In this section, the code snippet below sample channel ids from YouTube main page using async functions. I am using httpx instead of requests because the later library does not provide async http client. I also set the cookie "CONSENT": "YES+yt.463627267.en-GB+FX+553" to consent YouTube using cookies tracking. Yet, I remove all cookies except this one every time I make the request to avoid YouTube gives me the same channels. Lastly, I use regex to extract all the channel ids from the html.

The whole script only used 7.7 seconds to complete 100 requests, which is much faster to use the synchronous client.

import re
import httpx
import asyncio
from asyncio import Semaphore, create_task, gather
from httpx import AsyncClient
from time import time
from itertools import chain


async def async_request_youtube(sem: Semaphore, client: AsyncClient):
    async with sem:
        client.cookies = {"CONSENT": "YES+yt.463627267.en-GB+FX+553"} 
        client.headers = {
            'accept': '*/*', 
            'accept-encoding': 'gzip, deflate', 
            'connection': 'keep-alive', 
            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.5400.117 Safari/537.36'
        }

        response = await client.get("https://www.youtube.com")
        return response

async def sample_youtube_channels(n: int, sem: Semaphore):
    async with httpx.AsyncClient() as client:
        client.timeout = 10
        tasks = [
            create_task(async_request_youtube(sem, client)) for i in range(n)
        ]
        result = await gather(*tasks)
    
    return result

def request_youtube(n: int, concurrency: int = 50):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    sem = asyncio.Semaphore(concurrency)
    result = asyncio.run(sample_youtube_channels(n, sem))
    loop.close()
    return result

if __name__ == "__main__":
    start_ts = time()
    result = request_youtube(100, 50)
    channel_id_re = re.compile('"(UC[A-z0-9_-]{22})"')
    channel_ids = [
        channel_id_re.findall(res.text) for res in result
    ]
    channel_ids = list(set(list(chain(*channel_ids))))
    with open("./youtube_channel_ids.txt", "w") as f:
        for cid in channel_ids:
            _ = f.write(f"{cid}\n")
    end_ts = time()
    print(f"Time used: {end_ts - start_ts} seconds")

# Time used: 7.707166910171509 seconds