asyncio Python

Содержание
Введение
Простейший пример
HTTP запросы
Генератор
Sync → Async
Похожие статьи

Введение

Установка

python -m venv venv
source venv/Scripts/activate
touch requirements.txt
vi requirements.txt

asyncio==3.4.3 requests==2.31.0

python -m pip install --upgrade pip
python -m pip install -r requirements.txt

Пример

import time def brew_coffee(): print("Start brew_coffee()") time.sleep(3) print("End brew_coffee()") return "Coffee ready" def toast_bagel(): print("Start toast_bagel()") time.sleep(2) print("End toast_bagel()") return "Bagel ready" def main(): start_time = time.time() result_coffee = brew_coffee() result_bagel = toast_bagel() end_time = time.time() elapsed_time = end_time - start_time print(f"Result of brew_coffee is: {result_coffee}") print(f"Result of toast_bagel is: {result_bagel}") print(f"Elapsed time: {elapsed_time}") if __name__ == "__main__": main()

Start brew_coffee() End brew_coffee() Start toast_bagel() End toast_bagel() Result of brew_coffee is: Coffee ready Result of toast_bagel is: Bagel ready Elapsed time: 5.0012452602386475 Process finished with exit code 0

import asyncio import time async def brew_coffee(): print("Start brew_coffee()") time.sleep(3) print("End brew_coffee()") return "Coffee ready" async def toast_bagel(): print("Start toast_bagel()") time.sleep(2) print("End toast_bagel()") return "Bagel ready" def main(): start_time = time.time() result_coffee = brew_coffee() result_bagel = toast_bagel() end_time = time.time() elapsed_time = end_time - start_time print(f"Result of brew_coffee is: {result_coffee}") print(f"Result of toast_bagel is: {result_bagel}") print(f"Elapsed time: {elapsed_time}") if __name__ == "__main__": main()

Result of brew_coffee is: <coroutine object brew_coffee at 0x00000202D448D240> Result of toast_bagel is: <coroutine object toast_bagel at 0x00000202D448E5C0> Elapsed time: 0.0 C:\Users\Andrei\py\async_ex.py:36: RuntimeWarning: coroutine 'brew_coffee' was never awaited main() RuntimeWarning: Enable tracemalloc to get the object allocation traceback C:\Users\Andrei\py\async_ex.py:36: RuntimeWarning: coroutine 'toast_bagel' was never awaited main() RuntimeWarning: Enable tracemalloc to get the object allocation traceback

gather()

Используем asyncio.gather() чтобы запустить задания асинхронно.

Официальная документация

import asyncio import time async def brew_coffee(): print("Start brew_coffee()") # time.sleep is not awaitable function await asyncio.sleep(3) print("End brew_coffee()") return "Coffee ready" async def toast_bagel(): print("Start toast_bagel()") await asyncio.sleep(2) print("End toast_bagel()") return "Bagel ready" async def main(): start_time = time.time() batch = asyncio.gather(brew_coffee(), toast_bagel()) result_coffee, result_bagel = await batch end_time = time.time() elapsed_time = end_time - start_time print(f"Result of brew_coffee is: {result_coffee}") print(f"Result of toast_bagel is: {result_bagel}") print(f"Elapsed time: {elapsed_time}") if __name__ == "__main__": asyncio.run(main())

Start brew_coffee() Start toast_bagel() End toast_bagel() End brew_coffee() Result of brew_coffee is: Coffee ready Result of toast_bagel is: Bagel ready Elapsed time: 3.0193281173706055

gather запускает таски конкурентно таким образом, что исключение в одном из тасков отменяет все остальные.

Таски можно пе

Альтернативный способ

async def main(): start_time = time.time() # batch = asyncio.gather(brew_coffee(), toast_bagel()) # result_coffee, result_bagel = await batch coffee_task = asyncio.create_task(brew_coffee()) toast_task = asyncio.create_task(toast_bagel()) result_coffee = await coffee_task result_bagel = await toast_task end_time = time.time() elapsed_time = end_time - start_time print(f"Result of brew_coffee is: {result_coffee}") print(f"Result of toast_bagel is: {result_bagel}") print(f"Elapsed time: {elapsed_time:.2f} seconds")

HTTP запросы

В файле req_http.py напишем две функции. Одна будет для простых = синхронных запросов с помощью requests

Вторая будет создана на основе первой и будет позоволять выполнять асинхронные запросы. О том как сделать на основе синхронной функции её асинхронную версию вы можете прочитать в главе Sync → Async

# req_http.py import asyncio import requests # A few handy JSON types JSON = int | str | float | bool | None | dict[str, "JSON"] | list["JSON"] JSONObject = dict[str, JSON] JSONList = list[JSON] def http_get_sync(url: str) -> JSONObject: response = requests.get(url) return response.json() async def http_get(url: str) -> JSONObject: return await asyncio.to_thread(http_get_sync, url)

В дальнейших примерах будем использовать сперва синхронную функцию http_get_sync(), а затем и асинхронную http_get()

Получим с pokeapi.co имя случайного покемона. Сперва синхронным способом

from random import randint from req_http import http_get_sync # The highest Pokemon id MAX_POKEMON = 898 def get_random_pokemon_name_sync() -> str: pokemon_id = randint(1, MAX_POKEMON) pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}" pokemon = http_get_sync(pokemon_url) return str(pokemon["name"]) def main() -> None: pokemon_name = get_random_pokemon_name_sync() print(pokemon_name) if __name__ == "__main__": main()

swanna

Асинхронный вариант тех же действий

import asyncio from random import randint from req_http import http_get # The highest Pokemon id MAX_POKEMON = 898 async def get_random_pokemon_name() -> str: pokemon_id = randint(1, MAX_POKEMON) pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}" pokemon = await http_get(pokemon_url) return str(pokemon["name"]) async def main() -> None: pokemon_name = await get_random_pokemon_name() print(pokemon_name) if __name__ == "__main__": asyncio.run(main())

bronzong

В данном примере делается всего один запрос, поэтому разницу увидеть нельзя.

Выполним по 20 запросов. Оставим асинхронный синтаксис, но в первом случае просто запустим цикл for … in а во втором случае используем asyncio.gather()

В предыдущем примере у нас было два разных именованных задания, поэтому мы перечисляли их явно

batch = asyncio.gather(brew_coffee(), toast_bagel())

Сейчас одно задание нужно асинхронно запустить 20 раз поэтому мы воспользуемся генератором списка

import asyncio from random import randint from time import perf_counter from req_http import http_get # The highest Pokemon id MAX_POKEMON = 898 async def get_random_pokemon_name() -> str: pokemon_id = randint(1, MAX_POKEMON) pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}" pokemon = await http_get(pokemon_url) return str(pokemon["name"]) async def main() -> None: # 20 calls without real async time_before = perf_counter() for _ in range(20): pokemon_name = await get_random_pokemon_name() print(pokemon_name) print(f"Total time (synchronous): {perf_counter() - time_before:.2f}.") # 20 calls with async time_before = perf_counter() result = await asyncio.gather(*[get_random_pokemon_name() for _ in range(20)]) print(result) print(f"Total time (asynchronous): {perf_counter() - time_before:.2f}.") if __name__ == "__main__": asyncio.run(main())

cleffa timburr gothita wynaut wynaut trumbeak audino togedemaru dedenne necrozma cramorant gulpin registeel lileep houndoom luvdisc tympole dratini braixen floatzel Total time (synchronous): 9.47. ['dhelmise', 'articuno', 'ferroseed', 'rillaboom', 'vanillite', 'darkrai', 'hatterene', 'machamp', 'charjabug', 'dreepy', 'pineco', 'ralts', 'liepard', 'greedent', 'ferrothorn', 'urshifu-single-strike', 'combee', 'taillow', 'sandaconda', 'falinks'] Total time (asynchronous): 2.53.

Выигрыш во времени составил почти семь секунд

Генератор

Изучим создание асинхронных генераторов

Сперва напишем генератор внешне похожий на асинхронный, но по сути синхронный

import asyncio from random import randint from time import perf_counter from typing import AsyncIterable from req_http import http_get # The highest Pokemon id MAX_POKEMON = 898 async def get_random_pokemon_name() -> str: pokemon_id = randint(1, MAX_POKEMON) pokemon_url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}" pokemon = await http_get(pokemon_url) return str(pokemon["name"]) async def next_pokemon(total: int) -> AsyncIterable[str]: for _ in range(total): name = await get_random_pokemon_name() yield name async def main() -> None: time_before = perf_counter() async for name in next_pokemon(20): print(name) print(f"Total time (asynchronous): {perf_counter() - time_before:.2f}.") if __name__ == "__main__": asyncio.run(main())

vikavolt piplup seedot bronzor diancie hoppip grimmsnarl shellder squirtle cosmog gothitelle whiscash butterfree slugma rufflet binacle froslass grimer galvantula kingdra Total time (asynchronous): 12.58.

Ещё один вариант - тоже последовательное выполнение

async def main() -> None: time_before = perf_counter() names = [name async for name in next_pokemon(20)] print(names) print(f"Total time (asynchronous): {perf_counter() - time_before:.2f}.")

['gothita', 'jirachi', 'hoppip', 'machop', 'silcoon', 'cranidos', 'flaaffy', 'inkay', 'snorlax', 'cacturne', 'zekrom', 'hatenna', 'fomantis', 'abomasnow', 'victreebel', 'zygarde-50', 'tympole', 'trumbeak', 'gossifleur', 'aipom'] Total time (asynchronous): 12.54.

Sync → Async

Разберёмся как из синхронной фукнции сделать асинхронную.

import asyncio import requests from time import perf_counter async def counter(until: int = 10) -> None: now = perf_counter() print(f"Started at {now}") for i in range(0, until): last = now await asyncio.sleep(0.01) now = perf_counter() print(f"{i}: Was asleep for {now - last}s") def send_request(url: str) -> int: print("Sending request", url) response = requests.get(url) return response.status_code async def main() -> None: status_code = send_request("https://www.testsetup.ru") print(f"Got HTTP response with status code {status_code}") await counter() if __name__ == "__main__": time_before = perf_counter() asyncio.run(main()) print(f"Total time (synchronous): {perf_counter() - time_before:.2f}.")

Sending request https://www.testsetup.ru Got HTTP response with status code 200 Started at 607498.7973796 0: Was asleep for 0.012633100035600364s 1: Was asleep for 0.015104799997061491s 2: Was asleep for 0.015944099985063076s 3: Was asleep for 0.015807499992661178s 4: Was asleep for 0.015661500045098364s 5: Was asleep for 0.01566479995381087s 6: Was asleep for 0.015294400043785572s 7: Was asleep for 0.014994499972090125s 8: Was asleep for 0.015561699983663857s 9: Was asleep for 0.016510399989783764s Total time (synchronous): 0.61.

Если попытаться использовать asyncio.create_task() без изменений функции - асинхронности не будет

async def main() -> None: task = asyncio.create_task(counter()) status_code = send_request("https://www.testsetup.ru") print(f"Got HTTP response with status code {status_code}") await task

Got HTTP response with status code 200 Started at 607761.8726535 0: Was asleep for 0.020703700021840632s 1: Was asleep for 0.01604280003812164s 2: Was asleep for 0.015632100054062903s 3: Was asleep for 0.01587679993826896s 4: Was asleep for 0.01603860000614077s 5: Was asleep for 0.015200400026515126s 6: Was asleep for 0.015473500010557473s 7: Was asleep for 0.015260300016961992s 8: Was asleep for 0.015920699923299253s 9: Was asleep for 0.015205400064587593s Total time (synchronous): 0.70.

Чтобы добиться асинхронности нужно создать функцию, которая будет предавать синхронную send_request() в asyncio.to_thread() и в main() вызывать эту новую функцию.

import asyncio import requests from time import perf_counter async def counter(until: int = 10) -> None: now = perf_counter() print(f"Started at {now}") for i in range(0, until): last = now await asyncio.sleep(0.01) now = perf_counter() print(f"{i}: Was asleep for {now - last}s") def send_request(url: str) -> int: print("Sending request", url) response = requests.get(url) return response.status_code async def send_async_request(url: str) -> int: return await asyncio.to_thread(send_request, url) async def main() -> None: task = asyncio.create_task(counter()) status_code = await send_async_request("https://www.testsetup.ru") print(f"Got HTTP response with status code {status_code}") await task if __name__ == "__main__": time_before = perf_counter() asyncio.run(main()) print(f"Total time (synchronous): {perf_counter() - time_before:.2f}.")

Sending request https://www.testsetup.ru Started at 608865.5960778 0: Was asleep for 0.0164191999938339s 1: Was asleep for 0.015472600003704429s 2: Was asleep for 0.0159213999286294s 3: Was asleep for 0.0156949000665918s 4: Was asleep for 0.015243700006976724s 5: Was asleep for 0.01570360001642257s 6: Was asleep for 0.015830199932679534s 7: Was asleep for 0.015123500023037195s 8: Was asleep for 0.015925299958325922s 9: Was asleep for 0.015769100049510598s Got HTTP response with status code 200 Total time (asynchronous): 0.44.

Альтернативный вариант

async def main() -> None: status_code, _ = await asyncio.gather( send_async_request("https://www.testsetup.ru"), counter() )

Sending requestStarted at 608776.925042 https://www.testsetup.ru 0: Was asleep for 0.013800199958495796s 1: Was asleep for 0.015492000035010278s 2: Was asleep for 0.014966199989430606s 3: Was asleep for 0.016110299970023334s 4: Was asleep for 0.01660890004131943s 5: Was asleep for 0.015206799958832562s 6: Was asleep for 0.015179799986071885s 7: Was asleep for 0.015726300072856247s 8: Was asleep for 0.015746999997645617s 9: Was asleep for 0.015381600009277463s Total time (asynchronous): 0.43.

В начале главы про HTTP мы видели применение asyncio.to_thread() для создания асинхронной версии функции.

def http_get_sync(url: str) -> JSONObject: response = requests.get(url) return response.json() async def http_get(url: str) -> JSONObject: return await asyncio.to_thread(http_get_sync, url)

Альтернативный вариант - использование библиотеки aiohttp

import aiohttp # A few handy JSON types JSON = int | str | float | bool | None | dict[str, "JSON"] | list["JSON"] JSONObject = dict[str, JSON] JSONList = list[JSON] async def http_get(url: str) -> JSONObject: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json()

Автор статьи: Андрей Олегович

Похожие статьи
FastAPI
list comprehension: Абстракция списка
Python
Pydantic
Циклы
asyncio
Ошибки
multiprocessing

Поиск по сайту

Подпишитесь на Telegram канал @aofeed чтобы следить за выходом новых статей и обновлением старых

Перейти на канал

@aofeed

Задать вопрос в Телеграм-группе

@aofeedchat

Контакты и сотрудничество:
Рекомендую наш хостинг beget.ru
Пишите на info@urn.su если Вы:
1. Хотите написать статью для нашего сайта или перевести статью на свой родной язык.
2. Хотите разместить на сайте рекламу, подходящую по тематике.
3. Реклама на моём сайте имеет максимальный уровень цензуры. Если Вы увидели рекламный блок недопустимый для просмотра детьми школьного возраста, вызывающий шок или вводящий в заблуждение - пожалуйста свяжитесь с нами по электронной почте
4. Нашли на сайте ошибку, неточности, баг и т.д. ... .......
5. Статьи можно расшарить в соцсетях, нажав на иконку сети: