Асинхронное программирование для начинающих

МЕНЮ


Искусственный интеллект
Поиск
Регистрация на сайте
Помощь проекту

ТЕМЫ


Новости ИИРазработка ИИВнедрение ИИРабота разума и сознаниеМодель мозгаРобототехника, БПЛАТрансгуманизмОбработка текстаТеория эволюцииДополненная реальностьЖелезоКиберугрозыНаучный мирИТ индустрияРазработка ПОТеория информацииМатематикаЦифровая экономика

Авторизация



RSS


RSS новости


Слышали об асинхронном программировании в Python? Интересно познакомиться с его особенностями и практическими областями применения? Быть может, вам даже пришлось столкнуться с определенными проблемами во время написания многопоточных программ. В любом случае, если вы хотите получше познакомиться с темой, это правильное место.

Содержание статьи

  • Особенности асинхронного программирования в Python
  • Создания синхронного веб-сервера
  • Иной подход к программированию в Python
  • Программирование родительского элемента: не так уж просто!
  • Использование асинхронных особенностей Python на практике
  • Синхронное программирование Python
  • Совместный параллелизм с блокирующими вызовами
  • Кооперативный параллелизм с неблокирующими вызовами Python
  • Синхронные (блокирующие) HTTP вызовы
  • Асинхронные (неблокирующие) HTTP вызовы Python

Основные пункты данной статьи:

  • Что такое синхронное программирование;
  • Что такое асинхронное программирование;
  • Когда требуется написание асинхронных программ;
  • Как использовать асинхронные особенности Python.

Особенности асинхронного программирования в Python

Синхронная программа выполняется поэтапно. Даже при наличии условных операторов, циклов и вызовов функций, код можно рассматривать как процесс, где за раз выполняется один шаг. По завершении одного шага программа переходит к другому.

Вот два примера программ, которые работают синхронно:

  • Программы для пакетной обработки обычно создаются синхронно. Вы получаете некие входные данные, обрабатываете их и создаете определенный вывод. Шаг следует за шагом до тех пор, пока программа не достигнет желаемого вывода. При написании кода важно только следить за этапами и их правильном порядком;
  • Программы для командной строки является небольшими, быстрыми процессами, которые запускаются в терминале. Данные скрипты используются для создания или трансформирования чего-то, генерации отчета или создания списка данных. Все это может быть создано через серию шагов, которые выполняются последовательно до завершения окончания программы.

Асинхронная программа действует иначе. Код по-прежнему будет выполняться шаг за шагом.

Основная разница в том, что системе не обязательно ждать завершения одного этапа перед переходом к следующему.

Это значит, что программа перейдет к выполнению следующего этапа, когда предыдущий еще не завершен и все еще выполняется где-то параллельно. Это также значит, что программе известно, что нужно делать после окончания предыдущего этапа.

Зачем же писать код подобным образом? Далее будет дан подробный ответ на данный вопрос, а также предоставлены инструменты для элегантного решения интересных асинхронных задач.

Создания синхронного веб-сервера

Процесс создания веб-сервера в общем и целом схож с пакетной обработкой. Сервер получает определенные входные данные, обрабатывает их и создает вывод. Написанная таким образом синхронная программа создает рабочий веб-сервер.

Однако такой веб-сервер был бы просто ужасным.

Почему? В данном случае каждая единица работы (ввод, обработка, вывод) не является единственной целью. Настоящая цель заключается в быстром выполнении сотен или даже тысяч единиц работы. Это может продолжаться на протяжении длительного времени, и несколько рабочих единиц могут поступить одновременно.

Можно ли сделать синхронный веб-сервер лучше? Конечно можно попробовать оптимизировать этапы выполнения для наиболее быстрой работы. К сожалению, у этого подхода есть ограничения. Результатом может быть веб-сервер, который отвечает медленно, не справляется с работой или копит невыполненные задачи даже по завершении срока.

На заметку: Есть и другие ограничения, с которыми можно столкнуться при попытке оптимизировать указанный выше подход. В их число входит скорость сети, скорость I/O (ввод-вывода) файла, скорость запроса базы данных (MySQL, SQLite) и скорость других подсоединенных устройств. Общая особенность в том, что везде есть функции ввода-вывода. Все эти элементы работают на порядок медленнее, чем скорость обработки CPU.

В синхронной программе, если шаг выполнения запускает запрос к базе данных, тогда CPU практически не используется, пока не будет возвращен запрос к базе данных. Для пакетно-ориентированных программ большую часть времени это не является приоритетом. Обработка результатов этой операции ввода-вывода является целью. Часто это может занять больше времени, чем сама операция ввода-вывода. Любые усилия по оптимизации будут сосредоточены на обработке, а не на вводе-выводе.

Техники асинхронного программирования позволяют программам использовать преимущества относительно медленных процессов ввода-вывода, освобождая CPU для выполнения другой работы.

Иной подход к программированию в Python

В начале изучения асинхронного программирования вы можете столкнуться с многочисленными дискуссиями относительно важности блокирования и написания неблокирующего кода. У меня, например, было много сложностей при разборе данных концепций, как во время разбора документации, так и при обсуждении темы с другими программистами.

Что такое неблокирующий код? Возникает встречный вопрос — что такое блокирующий код? Помогут ли ответы на данные вопросы при создании лучшего веб-сервера? Если да, как это сделать? Будем выяснять!

Написание асинхронных программ требует несколько иного подхода к программированию. Новый взгляд на устоявшуюся в сознании тему может быть непривычным, но это интересное упражнение. Все оттого, что реальный мир сам по себе по большей части асинхронный, как и то, как мы с ним взаимодействуем.

Представьте следующее: вы родитель, что пытается совмещать сразу несколько задач. Вам нужно заняться подсчетом коммунальных услуг, стиркой и присмотреть за детьми. Вы делаете эти вещи параллельно, особенно не задумываясь о том, как именно. Давайте разберем все по полочкам:

  • Подсчет коммунальных услуг является синхронной задачей. Шаг за шагом, пока все не оплачено. За данный процесс вы отвечаете полностью сами;
  • Тем не менее, вы можете отвлечься от подсчетов и заняться стиркой. Можно высушить постиранное белье и загрузить в стиральную машинку новую партию;
  • Работа со стиральной машинкой и сушкой является синхронной задачей, и основная часть работы приходится на то, что происходит после загрузки одежды. Машинка стирает сама, поэтому вы можете вернуться к подсчету коммунальных услуг. К данному моменту сушка и стирка стали асинхронными задачами. Сушилка и стиральная машинка теперь будут работать независимо от вас и друг от друга до тех пор, пока звуковой сигнал не сообщит о завершении процесса;
  • Присмотр за детьми является другой асинхронной задачей. По большей части они могут играть самостоятельно. Возможно, кто-то захочет перекусить, или кому-то понадобится помощь, тогда вам нужно будет как-то отреагировать. Особенно это важно в случае, если ребенок поранится или заплачет. Дети являются долгоиграющей задачей с высшим приоритетом. Присмотр за ними намного важнее стирки и подсчета коммунальных платежей.

Данные примеры могут помочь представить концепты блокирующего и неблокирующего кода. Рассмотрим их, заменив примеры на термины программирования. В роли центрального процессора CPU будете выступать вы сами. Во время погружения одежды в стиральную машинку вы (CPU) заняты и заблокированы от других задач, к примеру, подсчета коммунальных услуг. Но ничего страшного, ведь самой стиркой вам заниматься не нужно.

С другой стороны, работающая стиральная машинка не блокируют вас от занятия другими задачами. Это асинхронная функция, так как вам не нужно ждать ее завершения. После запуска машинки вы можете заняться чем-то другим. Это называется переключением контекста, или context switch. Контекст того, что вы делаете изменился, но через некоторое время звуковой сигнал сообщит о завершении стирки.

Будучи людьми, в большинстве случаев мы так и действуем. Нам естественно постоянно переключаться от дела к делу, даже не задумываясь об этом. Разработчику важно суметь перевести поведение подобного рода на язык кода, который бы работал аналогичным образом.

Программирование родительского элемента: не так уж просто!

Если вы узнали себя (или своих родителей) в вышеуказанном примере, отлично! Вам будет проще разобраться в асинхронном программировании. Напомним, что вы можете переключать контекст, легко менять, выбирать новые задачи и завершать старые. Теперь попробуем воплотить данную манеру поведения в коде по отношению к виртуальным родителям.

Мысленный эксперимент #1: Синхронный родитель

Каким образом вы бы создали родительскую программу, что выполняла бы все вышеперечисленные задачи в синхронной манере? Так как присмотр за детьми является приоритетной задачей, возможно, ваша программа только этим и будет заниматься. Родитель будет присматривать за детьми, ожидая чего-то, что может потребовать его внимания. Однако ничего другого (вроде подсчета коммунальных услуг или стирки) на протяжении данного сценария сделано не будет.

Теперь вы можете назначать приоритеты задачам так, как вам хочется. Однако только одна задача может произойти в любой момент времени. Это результат синхронного, пошагового подхода. Как и синхронный веб-сервер, описанный выше, это может сработать, однако многим такая жизнь может показаться не очень удобной. Родитель не сможет ничем заняться, пока дети не уснут. Все другие задачи будут выполняться позже, до поздней ночи. От такой жизни многие с ума сойдут уже через несколько дней.

Мысленный эксперимент #2: Родитель опросник

При использовании опросника, или polling, можно изменить вещи подобным образом, чтобы многочисленные задачи были завершены. В данном подходе родитель периодически отрывается от текущей задачи и проверяет, не требуют ли другие задачи внимания.

Давайте сделаем интервал опросника примерно в пятнадцать минут. Теперь каждые пятнадцать минут родитель проверяет, не нужно ли заняться стиральной машиной, высушенной одеждой или детьми. Если нет, то родитель может вернуться к работе с подсчетом коммунальных услуг. Однако, если какое-либо из этих заданий требует внимания, родитель позаботится об этом, прежде чем вернуться к подсчетам. Этот цикл продолжается до следующего тайм-аута из цикла опросника.

Этот подход также работает, ведь внимание уделяется множеству задач. Однако у него есть несколько проблем:

  • Родитель может потратить много времени, проверяя те вещи, на которых не нужно акцентировать внимания: Стиральная машинка еще не закончила работа, другая одежда все еще сушится, а детям потребуется уделить внимание только в том случае, если произойдет что-то непредвиденное;
  • Родитель может пропустить завершение задач, которые требуют определенного внимания. К примеру, если стирка завершилась в начале интервала опросника, на это никто не будет обращать внимания целые пятнадцать минут! Кроме того,  присмотр за детьми должен иметь наивысший приоритет. Столкнувшись с проблемой, ребенок вряд ли станет ждать родителей пятнадцать минут, ему потребуется внимание сразу же.

Можно решить эти проблемы, сократив интервал опросника, но теперь родитель (CPU) будет тратить больше времени на переключение контекста между задачами. Это происходит, когда вы начинаете достигать точки убывающей отдачи. Опять же, немногие смогут нормально так жить.

Мысленный эксперимент #3: Родитель потока

«Вот бы у меня был клон…» Если вы родитель, тогда мысли подобного рода у вас наверняка периодически возникают. Во время программирования виртуальных родителей это действительно можно сделать, используя потоки. Данный механизм позволяет одновременно запускать несколько секций программы. Каждая секция кода, запущенная независимо, называется потоком, и все потоки разделяют одно и то же пространство памяти.

Если вы рассматриваете каждую задачу как часть одной программы, можете разделить их и запустить в виде потоков. Другими словами, можно «клонировать» родителя, создав по одному экземпляру для каждой задачи: присмотр за детьми, работой стиральной машинки, сушилки и подсчет коммунальных услуг. Все эти «клоны» работают независимо.

Это звучит как довольно хорошее решение, но у него есть и некоторые сложности. Одной из них является тот факт, что вам придется указывать каждому родительскому экземпляру, что именно делать в программе. Это может привести к некоторым проблемам, поскольку все экземпляры программы используют одни и те же элементы.

К примеру, скажем, Родитель А следит за сушилкой. Увидев, что вещи высушились, Родитель А уберет их и развесит новые. В то же время Родитель В замечает, что стиральная машинка завершила работу, поэтому он начинает вытаскивать одежду. Однако Родителю В также нужно заняться сушилкой, чтобы развесить постиранное белье. Сейчас это невозможно, так как в данный момент сушилкой занимается Родитель А.

Через некоторое время Родитель А заканчивает собирать одежду. Теперь ему хочется заняться стиральной машинкой и переместить вещи на пустую сушилку. Это также невозможно, ведь у стиральной машинки сейчас Родитель В.

Сейчас эти два родителя находятся в состоянии взаимной блокировки, или deadlock. Они оба имеют контроль над своим собственным ресурсом, но также хотят контролировать другой ресурс. Им придется ждать вечно, пока другой родительский экземпляр не освободит контроль. Как программист, вы должны написать код, чтобы разрешить такую ситуацию.

На заметку: Многопоточные программы позволяют создавать несколько параллельных путей выполнения, которые совместно используют одно и то же пространство памяти. Это может быть как преимуществом, так и недостатком. Любая память, совместно используемая потоками, подчиняется одному или нескольким потокам, пытающимся одновременно использовать одну и ту же общую память. Это может привести к повреждению данных, чтению данных в поврежденном состоянии и просто к беспорядочным данным в целом.

В многопоточном программировании переключение контекста происходит под управлением системы, а не программиста. Система контролирует, когда переключать контексты и когда предоставлять потокам доступ к общим данным, тем самым изменяя контекст использования памяти. Все виды проблем подобного рода управляемы в многопоточном коде, однако их трудно разрешить и отладить без ошибок.

Вот еще одна проблема, которая может возникнуть из-за многопоточности. Предположим, что ребенок получил травму и нуждается в неотложной помощи. Родителю «С» было поручено присматривать за детьми, поэтому он сразу же забирает ребенка. При оказании неотложной помощи Родителю «C» необходимо выписать достаточно большой чек, чтобы покрыть расходы на посещение врача.

Тем временем Родитель «D» дома работает над подсчетом коммунальных платеже, следовательно, сейчас он отвечает за финансы. Он не знает о дополнительных расходах на врача, поэтому очень удивлен, что на оплату счетов средств не хватает.

Помните, что эти два родительских экземпляра работают в одной программе. Семейные финансы являются общим ресурсом, поэтому вам нужно найти способ, чтобы родитель, наблюдающий за ребенком, проинформировал родителя, который занимает подсчетом средств. В противном случае вам потребуется предоставить какой-то механизм блокировки, чтобы финансовый ресурс мог использовать только один родитель за раз, с обновлениями.

Использование асинхронных особенностей Python на практике

Попробуем воспользоваться некоторыми вышеуказанным подходами и превратим их в функционирующие программы Python.

Все примеры статьи были протестированы на Python 3.8. В файле requirements.txt указано, какие модули вам нужно установить, чтобы запустить все примеры.

requirements.txt

Python

1

2

3

4

5

6

7

8

9

10

11

aiohttp==3.6.2

async-timeout==3.0.1

attrs==19.3.0

certifi==2019.11.28

chardet==3.0.4

codetiming==1.1.0

idna==2.8

multidict==4.7.4

requests==2.22.0

urllib3==1.25.7

yarl==1.4.2

Сохраните как requirements.txt и выполните команду в терминале:

Shell

1

pip3 install-rrequirements.txt

Вам также потребуется установить виртуальную среду Python для запуска кода, чтобы не мешать системному Python.

Синхронное программирование Python

Первый пример представляет собой несколько ответвленный способ создания задачи для извлечения работы из очереди и последующей ее обработки. Очередь в Python является структурой данных FIFO (first in, first out — «первым пришел — первым ушел»). Она предоставляет методы для размещения элементов в очередь и их повторного вывода в том порядке, в котором они были поставлены.

В данном случае работа состоит в том, чтобы получить номер из очереди и рассчитать количество циклов до этого числа. Оно выводится на консоль, когда начинается цикл, и снова при общем выводе. Программа демонстрирует способ, при котором несколько синхронных задач обрабатывают работу в очереди.

Программа, названная example_1.py, полностью представлена ниже:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

importqueue

deftask(name,work_queue):

ifwork_queue.empty():

print(f"Task {name} nothing to do")

else:

whilenotwork_queue.empty():

count=work_queue.get()

total=0

print(f"Task {name} running")

forxinrange(count):

total+=1

print(f"Task {name} total: {total}")

defmain():

"""

    Это основная точка входа в программу

    """

# Создание очереди работы

work_queue=queue.Queue()

# Помещение работы в очередь

forwork in[15,10,5,2]:

work_queue.put(work)

# Создание нескольких синхронных задач

tasks=[(task,"One",work_queue),(task,"Two",work_queue)]

# Запуск задач

fort,n,qintasks:

t(n,q)

if__name__=="__main__":

main()

Рассмотрим важные строки программы:

  • Строка 1 импортирует модуль queue. Здесь программа хранит работу, которая должна быть выполнена задачами;
  • Строки с 3 по 13 определяют task(). Данная функция извлекает работу из очереди work_queue и обрабатывает ее до тех пор, пока больше не нужно ничего делать;
  • Строка 15 определяет функцию main() для запуска задач программы;
  • Строка 20 создает work_queue. Все задачи используют этот общий ресурс для извлечения работы;
  • Строки с 23 по 24 помещают работу в work_queue. В данном случае это просто случайное количество значений для задач, которые нужно обработать;
  • Строка 27 создает список кортежей задач со значениями параметров, передаваемых задачами;
  • Строки с 30 по 31 перебирают список кортежей задач, вызывая каждый из них и передавая ранее определенные значения параметров;
  • Строка 34 вызывает main() для запуска программы.

Задача в данной программе является просто функцией, что принимает строку и очередь в качестве параметров. При выполнении она ищет что-либо в очереди для обработки. Если есть над чем поработать, из очереди извлекаются значения, запускается цикл for для подсчета до этого значения и выводится итоговое значение в конце. Получение работы из очереди продолжается до тех пор, пока на не закончится.

Мы собрали ТОП Книг для Python программиста которые помогут быстро изучить язык программирования Python. Список книг: Книги по Python

При запуске данной программы будет получен следующий вывод:

Shell

1

2

3

4

5

6

7

8

9

Task One running

Task One total:15

Task One running

Task One total:10

Task One running

Task One total:5

Task One running

Task One total:2

Task Two nothing todo

Здесь показано, что всю работу выполняет Task One. Цикл while, в котором задействован Task One внутри task(), потребляет всю работу в очереди и обрабатывает ее. Когда этот цикл завершается, Task Two получает шанс на выполнение. Однако он обнаруживает, что очередь пуста, поэтому Task Two выводит оператор, который говорит, что ему нечего делать, и затем завершается. В коде нет ничего, что позволяло бы Task One и Task Two переключать контексты и работать вместе.

Простой кооперативный параллелизм в Python

Следующая версия программы позволяет двум задачам работать вместе. Добавление оператора yield означает, что цикл получит контроль в указанной точке, сохраняя при этом свой контекст. Таким образом, уступающая задача может быть возобновлена позже.

Оператор yield превращает task() в генератор. Функция генератора вызывается так же, как и любая другая функция в Python, но когда выполняется оператор yield, управление возвращается вызывающей функции. По сути, это переключение контекста, поскольку управление переходит от функции генератора к вызывающей стороне.

Интересная часть заключается в том, что функции генератора можно вернуть контроль, вызвав в генераторе next(). Получается переключение контекста обратно к функции генератора, что запускает выполнение со всеми переменными функции, которые были определены до того, как выход все еще остается неизменным.

Цикл while в main() использует данное преимущество при вызове next(t). Данный оператор перезапускает задачу с того места, где оно было ранее выполнено. Это значит, что у вас есть контроль во время переключения контекста: когда оператор yield выполняется в task().

Это форма совместной многозадачности. У программы контроль над своим текущим контекстом, и теперь можно запустить что-то еще. В таком случае цикл while в main() способен запускать два экземпляра task() в качестве функции генератора. Каждый экземпляр потребляет работу из одной и той же очереди. Это довольно умно, но для достижения тех же результатов, что и в первой программе, требуется потрудиться.

Программа example_2.py демонстрирует простой параллелизм и приведена ниже:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

importqueue

deftask(name,queue):

whilenotqueue.empty():

count=queue.get()

total=0

print(f"Task {name} running")

forxinrange(count):

total+=1

yield

print(f"Task {name} total: {total}")

defmain():

"""

    Это основная точка входа в программу

    """

# Создание очереди работы

work_queue=queue.Queue()

# Размещение работы в очереди

forwork in[15,10,5,2]:

work_queue.put(work)

# Создание задач

tasks=[task("One",work_queue),task("Two",work_queue)]

# Запуск задач

done=False

whilenotdone:

fortintasks:

try:

next(t)

exceptStopIteration:

tasks.remove(t)

iflen(tasks)==0:

done=True

if__name__=="__main__":

main()

Рассмотрим, что именно происходит в коде выше:

  • Строки с 3 по 11 определяют task(), как и раньше. Кроме того, в Строке 10 добавляется yield, превращая функцию в генератор. В этом случае происходит переключение контекста и управление возвращается обратно в цикл while в main();
  • Строка 25 создает список задач, но немного иначе, чем вы видели в предыдущем примере кода. В этом случае каждая задача вызывается с параметрами, указанными в переменной списка задач. Это необходимо для запуска функции генератора task() в первый раз;
  • Строки с 31 по 36 являются модификациями цикла while в main(), которые позволяют совместно выполнять task(). Управление возвращается к каждому экземпляру task(), позволяя циклу продолжаться и запустить другую задачу;
  • Строка 32 возвращает контроль к task() и продолжает выполнение после точки, где был вызван yield;
  • Строка 36 устанавливает переменную done. Цикл while заканчивается, когда все задачи завершены и удалены из tasks.

При запуске вышеуказанной программы будет получен следующий вывод:

Shell

1

2

3

4

5

6

7

8

Task One running

Task Two running

Task Two total:10

Task Two running

Task One total:15

Task One running

Task Two total:5

Task One total:2

Здесь видно, что Task One и Task Two выполняются и потребляют работу из очереди. Именно это требуется, поскольку обе задачи обрабатывают работу, и каждая отвечает за два элемента в очереди. Это интересно, но опять же, для достижения этих результатов требуется немало усилий.

Хитрость заключается в использовании оператора yield, который превращает task() в генератор и выполняет переключение контекста. Программа использует переключатель контекста для управления циклом while в main(), позволяя двум экземплярам задачи выполняться совместно.

Обратите внимание на то, как Task Two выводит итоговую сумму первой. Может показаться, что задачи выполняются асинхронно. Тем не менее, это все еще синхронная программа. Она структурирована так, что две задачи могут передавать контексты вперед и обратно. Причина, по которой Task Two выводит итоговую сумму в первую очередь, состоит в том, что она считает только до 10, а Task One до 15. Task Two просто достигает своей первой итоговой суммы, поэтому она выводит выходные данные на консоль раньше Task One.

На заметку: В коде из примера выше используется модуль codetiming, что фиксирует и выводит время, нужное для выполнения фрагментов кода. Более подробно почитать о данном модуле можете в данной статье на сайте RealPython.

Этот модуль является частью Python Package Index. Он создан Geir Arne Hjelle, одним из авторов популярного сайта Real Python. Если занимаетесь написанием кода, который должен включать функции синхронизации, то обязательно стоит обратить внимание на модуль codetiming.

Для того чтобы модуль codetiming был доступен, его требуется установить. Это можно сделать с помощью команды pip: pip install codetiming

Совместный параллелизм с блокирующими вызовами

Следующая версия программы такая же, как и предыдущая, за исключением добавления time.sleep(delay) в теле вашего цикла задач. Добавляется задержка, основанная на значении, полученном из рабочей очереди, к каждой итерации цикла задачи. Задержка имитирует эффект блокирующего вызова в вашей задачи.

Блокирующий вызов является кодом, который не дает CPU делать что-либо еще в течение некоторого периода времени. В вышеупомянутых мысленных экспериментах, если родитель не мог отвлечься от подсчета коммунальных услуг до завершения задачи, такой процесс был бы блокирующим вызовом.

В данном примереtime.sleep(delay) делает то же самое, потому что CPU не может ничего сделать, кроме ожидания истечения задержки.

Сперва установим нужные библиотеки:

Python

1

pip3 install codetiming

Код:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

importtime

importqueue

fromcodetiming importTimer

deftask(name,queue):

timer=Timer(text=f"Task {name} elapsed time: {{:.1f}}")

whilenotqueue.empty():

delay=queue.get()

print(f"Task {name} running")

timer.start()

time.sleep(delay)

timer.stop()

yield

defmain():

"""

    Это основная точка входа в программу

    """

# Создание очереди работы

work_queue=queue.Queue()

# Добавление работы в очередь

forwork in[15,10,5,2]:

work_queue.put(work)

tasks=[task("One",work_queue),task("Two",work_queue)]

# Запуск задач

done=False

withTimer(text=" Total elapsed time: {:.1f}"):

whilenotdone:

fortintasks:

try:

next(t)

exceptStopIteration:

tasks.remove(t)

iflen(tasks)==0:

done=True

if__name__=="__main__":

main()

Изменения, что были сделаны в данном коде:

  • Строка 1 импортирует модуль time, чтобы у программы был доступ к time.sleep();
  • Строка 3 импортирует код Timer из модуля codetiming;
  • Строка 6 создает экземпляр класса Timer, используемый для измерения времени, нужного для итерации каждой задачи цикла;
  • Строка 10 запускает экземпляр timer;
  • Строка 11 изменяет task() для включения time.sleep(delay) для имитации задержки IO. Это заменяет цикл for, что отвечал за подсчет в example_1.py;
  • Строка 12 останавливает экземпляр timer и выводит, истекшее с момента вызова timer.start(), время;
  • Строка 30 создает менеджер контекста Timer, что выводит истекшее время с момента начала всего цикла.

При запуске программы будет получен следующий вывод:

Shell

1

2

3

4

5

6

7

8

9

10

Task One running

Task One elapsed time:15.0

Task Two running

Task Two elapsed time:10.0

Task One running

Task One elapsed time:5.0

Task Two running

Task Two elapsed time:2.0

Total elapsed time:32.0

Как и ранее, Task On и Task Two  запускаются, собирая работу из очереди обрабатывая ее. Однако даже при добавлении задержки видно, что кооперативный параллелизм ничего не привнес. Задержка останавливает обработку всей программы, а CPU просто ждет, чтобы задержка IO завершилась.

Именно под этим и подразумевается блокирующий код Python в документации по асинхронизации. Вы заметите, что время, необходимое для запуска всей программы, является просто совокупным временем всех задержек. Выполнение заданий таким способом нельзя считать успешным.

Кооперативный параллелизм с неблокирующими вызовами Python

Следующая версия программы подверглась небольшим изменениям. Здесь используются асинхронные особенности asyncio/await, появившиеся в Python 3.

Модули time и queue были заменены пакетом asyncio. Программа получает доступ к асинхронной дружественной (неблокирующей) функциональности сна и очереди. Изменение task() определяет ее как асинхронную, добавляя на строке 4 префикса async. В Python это является показателем того, что функция будет асинхронной.

Другим большим изменением является удаление операторов time.sleep(delay) и yield с их последующей заменой на замена их на await asyncio.sleep(delay). Создается неблокирующая задержка, которая выполнит переключение контекста обратно к вызывающей main().

Цикла while внутри main() больше не существует. Вместо task_array есть вызов await asyncio.gather(...). Это сообщает asyncio о двух вещах:

  • Создание двух задач на основе task() и их запуск;
  • Ожидание завершения обеих задач до перехода к дальнейшим действиям.

Последней строкой программы является asyncio.run(main()). Здесь создается цикл событий. Данный цикл запускает main(), что в свою очередь запускает два экземпляра task().

Цикл событий лежит в основе асинхронной системы Python. Он запускает весь код, включая main(). Когда код задачи выполняется, CPU занят выполнением работы. С приближением ключевого слова await происходит переключение контекста, и контроль возвращается обратно в цикл событий. Цикл событий просматривает все задачи, ожидающие события (в данном случае asyncio.sleep(delay, и передает управление задаче с событием, которое готово.

await asyncio.sleep(delay) является неблокирующим по отношению к CPU. Вместо ожидания истечения времени ожидания, CPU регистрирует событие сна в очереди задач цикла событий и выполняет переключение контекста, передавая контроль циклу событий. Цикл событий непрерывно ищет завершенные события и передает контроль задаче, ожидающей этого события. Таким образом CPU может оставаться занятым, если работа доступна, а цикл обработки событий отслеживает события, которые произойдут в будущем.

На заметку: Асинхронная программа запускается в одном потоке выполнения. Переключение контекста с одного раздела кода на другой, который может повлиять на данные, полностью под вашим контролем. Это значит, что вы можете разбить и завершить весь доступ к данным совместно используемой памяти, прежде чем переключать контекст. Это помогает решить проблему общей памяти, что присуща многопоточному коду.

Код example_4.py приведен ниже:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

importasyncio

fromcodetiming importTimer

async deftask(name,work_queue):

timer=Timer(text=f"Task {name} elapsed time: {{:.1f}}")

whilenotwork_queue.empty():

delay=await work_queue.get()

print(f"Task {name} running")

timer.start()

await asyncio.sleep(delay)

timer.stop()

async defmain():

"""

    Это главная точка входа для главной программы

    """

# Создание очереди работы

work_queue=asyncio.Queue()

# Помещение работы в очередь

forwork in[15,10,5,2]:

await work_queue.put(work)

# Запуск задач

withTimer(text=" Total elapsed time: {:.1f}"):

await asyncio.gather(

asyncio.create_task(task("One",work_queue)),

asyncio.create_task(task("Two",work_queue)),

)

if__name__=="__main__":

asyncio.run(main())

Вот отличия данной программы от example_3.py:

  • Строка 1 импортирует asyncio для получения доступа к асинхронной функциональности Python. Это замена импорта time;
  • Строка 2 импортирует класс Timer из модуля codetiming;
  • Строка 4 добавляет ключевое слово async перед определением task(). Это сообщает программе, что task может выполняться асинхронно;
  • Строка 5 создается экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач;
  • Строка 9 запускает экземпляр timer;
  • Строка 10 заменяет time.sleep(delay) неблокирующим asyncio.sleep(delay), что также возвращает контроль (или переключает контексты) обратно в цикл основного события;
  • Строка 11 останавливается экземпляр timer и выводится истекшее время с момента вызова timer.start();
  • Строка 18 создает неблокирующую асинхронную work_queue;
  • Строки 21-22 помещают работу в work_queue асинхронно с использованием ключевого слова await;
  • Строка 25 создается менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение цикла while;
  • Строки 26-29 создают две задачи и собирают их вместе, поэтому программа будет ожидать завершения обеих задач;
  • Строка 32 запускает программу асинхронно. Здесь также запускается внутренний цикл событий.

При анализе вывода программы обратите внимание на одновременный запуск Task One и Task Two, а затем ложный вызов IO:

Shell

1

2

3

4

5

6

7

8

9

10

Task One running

Task Two running

Task Two total elapsed time:10.0

Task Two running

Task One total elapsed time:15.0

Task One running

Task Two total elapsed time:5.0

Task One total elapsed time:2.0

Total elapsed time:17.0

Это указывает на то, что await asyncio.sleep(delay) неблокирующая, и другая работа завершена.

В конце программы можно заметить, что общее время по сути в два раза меньше, чем при запуске example_3.py. Это преимущество программы, что использует асинхронные особенности. Каждая задача может одновременно запускать await asyncio.sleep(delay). Общее время выполнения программы теперь меньше, чем общее время частей. Нам удалось избавиться от синхронной модели.

Синхронные (блокирующие) HTTP вызовы

Следующая версия программы является как шагом вперед, так и отступлением назад. Программа выполняет некоторую работу с реальным I/O, отправляя HTTP запросы из списка с URL и получая содержимое страницы. Однако это происходит блокирующим (синхронным) образом.

Программа была изменена для импорта отличного модуля requests, чтобы сделать фактические HTTP-запросы. Кроме того, очередь теперь содержит список URL, а не номеров. Кроме того, task() больше не увеличивает счетчик. Вместо этого запросы получают содержимое URL из очереди и выводят потраченное на данное действие время.

Код example_5.py приведен ниже:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

importqueue

importrequests

fromcodetiming importTimer

deftask(name,work_queue):

timer=Timer(text=f"Task {name} elapsed time: {{:.1f}}")

withrequests.Session()assession:

whilenotwork_queue.empty():

url=work_queue.get()

print(f"Task {name} getting URL: {url}")

timer.start()

session.get(url)

timer.stop()

yield

defmain():

"""

    Это основная точка входа в программу

    """

# Создание очереди работы

work_queue=queue.Queue()

# Помещение работы в очередь

forurl in[

"http://google.com",

"http://yahoo.com",

"http://linkedin.com",

"http://apple.com",

"http://microsoft.com",

"http://facebook.com",

"http://twitter.com",

]:

work_queue.put(url)

tasks=[task("One",work_queue),task("Two",work_queue)]

# Запуск задачи

done=False

withTimer(text=" Total elapsed time: {:.1f}"):

whilenotdone:

fortintasks:

try:

next(t)

exceptStopIteration:

tasks.remove(t)

iflen(tasks)==0:

done=True

if__name__=="__main__":

main()

Вот что происходит в данной программе:

  • Строка 2 импортирует requests, что предоставляет удобный способ совершать HTTP вызовы.
  • Строка 3 импортирует класс Timer из модуля codetiming.
  • Строка 6 создается экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач.
  • Строка 11 запускает экземпляр timer
  • Строка 12 создает задержку, похожую на то, что в example_3.py. Однако на этот раз вызывается session.get(url), который возвращает содержимое URL, полученного из work_queue.
  • Строка 13 останавливает экземпляр timer и выводит истекшее время с момента вызова timer.start().
  • Строки с 23 по 32 помещают список URL в work_queue.
  • Строка 39 создается менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение всего цикла while.

При запуске этой программы вы увидите следующий вывод:

Shell

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Task One getting URL:http://google.com

Task One total elapsed time:0.3

Task Two getting URL:http://yahoo.com

Task Two total elapsed time:0.8

Task One getting URL:http://linkedin.com

Task One total elapsed time:0.4

Task Two getting URL:http://apple.com

Task Two total elapsed time:0.3

Task One getting URL:http://microsoft.com

Task One total elapsed time:0.5

Task Two getting URL:http://facebook.com

Task Two total elapsed time:0.5

Task One getting URL:http://twitter.com

Task One total elapsed time:0.4

Total elapsed time:3.2

Как и в более ранних версиях программы, yield превращает task() в генератор. Он также выполняет переключение контекста, позволяющее запустить другой экземпляр задачи.

Каждая задача получает URL из рабочей очереди, извлекает содержимое страницы и сообщает, сколько времени потребовалось для получения этого содержимого.

Как и раньше, yield позволяет обеим задачам работать совместно. Однако, поскольку эта программа работает синхронно, каждый вызов session.get() блокирует CPU, пока не будет получена страница. Обратите внимание на общее время, необходимое для запуска всей программы в конце. Это будет иметь смысл для следующего примера.

Асинхронные (неблокирующие) HTTP вызовы Python

Эта версия программы модифицирует предыдущую версию для использования асинхронных функций Python. Здесь импортируется модуль aiohttp, который является библиотекой для асинхронного выполнения HTTP запросов с использованием asyncio.

Задачи были изменены, чтобы удалить вызов yield, поскольку код для выполнения HTTP GET запроса больше не блокирующий. Он также выполняет переключение контекста обратно в цикл событий.

Программа example_6.py приведена ниже:

Python

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

importasyncio

importaiohttp

fromcodetiming importTimer

async deftask(name,work_queue):

timer=Timer(text=f"Task {name} elapsed time: {{:.1f}}")

async withaiohttp.ClientSession()assession:

whilenotwork_queue.empty():

url=await work_queue.get()

print(f"Task {name} getting URL: {url}")

timer.start()

async withsession.get(url)asresponse:

await response.text()

timer.stop()

async defmain():

"""

    Это основная точка входа в программу

    """

# Создание очереди работы

work_queue=asyncio.Queue()

# Помещение работы в очередь

forurl in[

"http://google.com",

"http://yahoo.com",

"http://linkedin.com",

"http://apple.com",

"http://microsoft.com",

"http://facebook.com",

"http://twitter.com",

]:

await work_queue.put(url)

# Запуск задач

withTimer(text=" Total elapsed time: {:.1f}"):

await asyncio.gather(

asyncio.create_task(task("One",work_queue)),

asyncio.create_task(task("Two",work_queue)),

)

if__name__=="__main__":

asyncio.run(main())

В данной программе происходит следующее:

  • Строка 2 импортирует библиотеку aiohttp, которая обеспечивает асинхронный способ выполнения HTTP вызовов.
  • Строка 3 импортирует класс Timer из модуля codetiming.
  • Строка 5 помечает task() как асинхронную функцию.
  • Строка 6 создает экземпляр Timer, используемый для измерения времени, необходимого для каждой итерации цикла задач.
  • Строка 7 создается менеджер контекста сессии aiohttp.
  • Строка 8 создает менеджер контекста ответа aiohttp. Он также выполняет HTTP вызов GET для URL, взятого из work_queue.
  • Строка 11 запускает экземпляр timer
  • Строка 12 использует сеанс для асинхронного получения текста из URL.
  • Строка 13 останавливает экземпляр timer и выводит истекшее время с момента вызова timer.start().
  • Строка 39 создает менеджер контекста Timer, который выводит истекшее время, затраченное на выполнение всего цикла while.

При запуске программы вы увидите следующий вывод:

Shell

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Task One getting URL:http://google.com

Task Two getting URL:http://yahoo.com

Task One total elapsed time:0.3

Task One getting URL:http://linkedin.com

Task One total elapsed time:0.3

Task One getting URL:http://apple.com

Task One total elapsed time:0.3

Task One getting URL:http://microsoft.com

Task Two total elapsed time:0.9

Task Two getting URL:http://facebook.com

Task Two total elapsed time:0.4

Task Two getting URL:http://twitter.com

Task One total elapsed time:0.5

Task Two total elapsed time:0.3

Total elapsed time:1.7

Посмотрите на общее прошедшее время, а также на индивидуальное время, чтобы получить содержимое каждого URL. Вы увидите, что длительность составляет примерно половину совокупного времени всех  HTTP GET запросов. Это связано с тем, что HTTP GET запросы выполняются асинхронно. Другими словами, вы эффективно используете преимущества CPU, позволяя ему делать несколько запросов одновременно.

Поскольку CPU очень быстрый, этот пример может создать столько же задач, сколько URL. В этом случае время выполнения программы будет соответствовать времени самого медленного поиска URL.

Заключение

В статье были предоставлены инструменты для начала работы с техниками асинхронного программирования. Использование асинхронных функций Python обеспечивает вас программным контролем во время переключения контекста. Теперь сложности, которые возникают в процессе многопоточного программирования, разрешить гораздо легче.

Асинхронное программирование является мощным инструментом, однако оно подойдет не для каждой программы. Например, если вы пишете программу, которая вычисляет число Пи с точностью до миллионных знаков после запятой, то асинхронный код не поможет. Такая программа связана с CPU, без большого количества I/O.  Однако, если вы пытаетесь реализовать сервер или программу, которая выполняет IO (например, доступ к файлам или сети), использование асинхронных функций Python может иметь огромное преимущество.

Подведем итоги изученных тем:

  • Что такое синхронное программирование
  • Отличия асинхронных программ, их мощность и управляемость
  • Случаи необходимости использования асинхронных программ
  • Использование асинхронных особенностей Python

Теперь, получив все необходимые знания, вы сможете писать программы совершенно иного уровня!


Источник: python-scripts.com

Комментарии: