Виктор Паперно
Содержание статьи
- Настройка инфраструктуры
- ESP32 Cam
- Telegram-бот
Для многих разработчиков Python — это язык, на котором были написаны их первые программы: «Hello, world!» или калькулятор. Сейчас начинающие программисты во время обучения часто пишут телеграм?ботов, благо на Python это сделать относительно несложно. Давай создадим бота, но не простого, а взаимодействующего с нашим собственным устройством для умного дома, которое мы самостоятельно соберем и запрограммируем.
На мой взгляд, основное достоинство Python — универсальность: с его помощью можно решить практически любую задачу благодаря огромному количеству сборок. Это привносящий магию C++ Cython, браузерный Brython, позволяющий работать с Java Jython и множество библиотек на все случаи жизни.
Но до сравнительно недавнего времени, если Python-программист хотел сделать девайс для умного дома или машинку на радиоуправлении, он был вынужден либо использовать Arduino и волей?неволей учить C++, либо довольствоваться микрокомпьютером вроде Raspberry Pi и искать (а порой и писать свои) библиотеки для управления теми или иными китайскими модулями. Иногда вообще приходилось создавать хтонических чудовищ, подключая проводом Arduino к Raspberry и опрашивая датчики через serial port, но все равно при этом нужно было писать прошивку для Arduino или другого контроллера на C++.
Но уже примерно пять лет, если судить по дате моей статьи про MicroPython, можно заниматься DIY-проектами, не переключаясь с одного языка программирования на другой. За это время многое в мире MicroPython поменялось, вышли новые сборки, появились одни библиотеки и полностью перестали поддерживаться другие, а Arduino официально приняла MicroPython в свою экосистему.
С каждым годом число поддерживающих MicroPython плат растет, но значительный их процент все еще базируется на ESP32. И действительно, у этого контроллера масса достоинств, таких как встроенные Wi-Fi и BLE, а еще, если говорить честно, отсутствие конкурентов в том же ценовом диапазоне. Но вот что касается плат, базирующихся на этом контроллере, — тут глаза разбегаются. Это и базовые версии от WeMos, целые наборы и модули от M5Stack и LILYGO. Не отстают, кстати, и российские инженеры: магазин iArduino не так давно выпустил свою отладочную плату Piranha.
К тому же нельзя забывать про Великий Китайский Ноунейм, которым завален «Алиэкспресс». Кстати, в нашем проекте мы будем использовать одну из таких плат — ESP32 Cam (я так и не смог разобраться, кто же ее официальный производитель). Как несложно догадаться по названию, это модуль на основе ESP32 с подключенной камерой, чаще всего это OV2640, но если ты хочешь добиться лучшего качества, то можно подключить и другие.
Вот список необходимого ПО:
- твоя любимая IDE для разработки — я использую PyCharm;
- ESP32 MPY-Jama — кросс?платформенная IDE для MicroPython;
- Telegram.
Итак, что же мы будем делать? Как ты уже, наверное, догадался, судя по тому, что я выбрал плату с камерой, — систему видеонаблюдения. Но не простую, с постоянной трансляцией, а с возможностью мгновенно получить фотографию. Управлять ею мы будем через Telegram-бота, а чтобы не пришлось покупать статический IP-адрес, для общения бота и девайса используем протокол MQTT.
INFO
MQTT (Message Queuing Telemetry Transport) — это легковесный протокол передачи сообщений, который используется для обмена данными между устройствами в интернете вещей (IoT). Он призван обеспечивать надежную связь между множеством устройств, которые работают с ограниченными ресурсами и могут быть подключены к интернету только временно или с периодическими задержками.
MQTT работает по принципу «издатель — подписчик», где одни устройства могут публиковать сообщения в топики (topics), а другие подписываться на них, чтобы получать сообщения, которые им нужны. Протокол предоставляет гибкий и простой способ передачи сообщений, что делает его идеальным для использования в системах IoT, где устройства могут иметь разные функциональные возможности и ограничения.
НАСТРОЙКА ИНФРАСТРУКТУРЫ
Перед тем как непосредственно перейти к программированию, необходимо настроить так называемый MQTT-брокер. Это сервер, на котором располагаются топики и к которому подключаются подписчики и издатели.
Разумеется, можно настроить его самостоятельно, например используя Mosquitto. Но стоит только подумать, что, кроме самого брокера, придется настраивать сервер, беспокоиться о его безопасности и так далее, как желание заниматься проектом куда?то пропадает.
Так что в качестве брокера мы будем использовать сервис shiftr.io. Его бесплатного тарифа нам хватит с головой. Среди достоинств этого сервиса — интуитивно понятный интерфейс, подробная документация и визуализация всей системы. На главной странице твоего проекта можно увидеть, какие топики созданы и какое в них было последнее сообщение, какие устройства подключены, а когда начнутся рассылки пакетов — их тоже будет видно!
Чтобы наши устройства — телеграм?бот и ESP32 Cam — могли взаимодействовать с нашим MQTT-брокером, необходимо выпустить токен. Это можно сделать в настройках твоего пространства.
После этого в списке токенов появится такой элемент: mqtt://xakep:utBmi6LTWS7b4l5@xakep.cloud.shiftr.io
, где xakep
— это логин, а utBmi6LTWS7b4l5
— пароль. Запомни их, они нам еще пригодятся. Инфраструктура настроена, можно приступать к программированию.
ESP32 CAM
Для программирования ESP32 на MicroPython существует несколько IDE. Я остановил свой выбор на ESP32 MPY-Jama — опенсорсном решении со множеством удобных функций.
Одна из них — графический интерфейс для утилиты esptool. Которую, впрочем, все равно нужно установить через pip: pip install esptool
.
Мы будем использовать специальную прошивку для ESP32 Cam, она доступна в этом GitHub-репозитории. Скачиваем файл прошивки и, используя IDE, прошиваем, не забыв перед этим отформатировать Flash-память.
Теперь мы можем подключиться и загрузить необходимые файлы на нашу плату. Для работы, кроме встроенных библиотек, нам понадобятся библиотеки micropython-umqtt.robust
и micropython-umqtt.simple
. Их можно установить с помощью встроенного менеджера пакетов upip. Для этого подключись к интернету через Wi-Fi, а затем выполни следующие команды:
import network
import upip
# Не забудь вставить имя своей Wi-Fi-сети и пароль от нее
def wifi_setup():
global sta_if
sta_if.active(True)
sta_if.scan()
sta_if.connect('SSID', 'PASSWORD')
print("Waiting for Wifi connection")
while not sta_if.isconnected(): time.sleep(1)
print("Connected to WiFi")
wifi_setup()
upip.install("micropython-umqtt.robust")
upip.install("micropython-umqtt.simple")
Теперь у нас все готово для работы. Чтобы облегчить задачу, я написал небольшую библиотечку — со всеми необходимыми функциями. Давай посмотрим, что в нее входит.
Вначале импортируем необходимые библиотеки:
network
— для подключения к интернету;umqtt.robust
— для взаимодействия с MQTT-брокером;camera
- в этой библиотеке находятся функции для работы с камерой;time
— похож на одноименную библиотеку из обычного Python;machine
— это базовый модуль MicroPython, позволяющий управлять пинами.
# Импорт библиотек
import network
from umqtt.robust import MQTTClient
import camera
import time
import machine
# Инициализация глобальных переменных
sta_if = network.WLAN(network.STA_IF); # Wi-Fi-модуль
camera.init(0, format=camera.JPEG, fb_location=camera.PSRAM) # Непосредственно камера
mqtt_client = None # Заготовка для MQTT-клиента
led = machine.Pin(4, machine.Pin.OUT) # Встроенный светодиод
С одной из функций библиотеки мы познакомились раньше, когда подключались к Wi-Fi, теперь рассмотрим остальные.
Первая называется take_photo
. В этой функции выполняется фотографирование и отправка в фотографии (которая сейчас выглядит как строчка байтов) в MQTT-топик 'image'
. Чтобы очистить топик, мы отправим строчку b'None'
перед тем, как отсылать непосредственно изображение.
def take_photo():
global mqtt_client
mqtt_client.publish('image', b'None', qos=0)
capture = camera.capture()
mqtt_client.publish('image', capture, qos=0)
Следующая функция — mqtt_callback
, это обработчик сообщений, приходящих в топики. В том случае, если в топик command
приходит команда take_photo
, мы вызываем уже знакомую нам функцию, а если команда enable_led
или disable_led
— то, соответственно, включаем или выключаем встроенный светодиод.
def mqtt_callback(topic, data_bytes):
global led
topic = topic.decode()
data = data_bytes.decode()
if topic == "command":
if data == "take_photo":
take_photo()
elif data == "enable_led":
led.value(1)
elif data == "disable_led":
led.value(0)
Идем дальше: mqtt_setup
— в этой функции производится подключение к MQTT-брокеру с использованием логина и пароля, которые мы настроили раньше, подключается колбэк (обработчик) и запускается бесконечный цикл ожидания команд от брокера.
def mqtt_setup():
global mqtt_client
mqtt_client = MQTTClient(
"umqtt_xakep_client",
server='xakep.cloud.shiftr.io',
port=1883,
user='xakep',
password='utBmi6LTWS7b4l5')
mqtt_client.set_callback(mqtt_callback)
mqtt_client.connect()
if mqtt_client:
mqtt_client.subscribe('command')
print("Connected to MQTT")
while True:
mqtt_client.check_msg()
Чтобы запустить наше устройство, необходимо импортировать из библиотеки функции wifi_setup
и mqtt_setup
, а затем по очереди их выполнить. Это можно сделать прямо из терминала. Если все настроено правильно, то в своем инстасе shiftr.io ты увидишь подключенное устройство с именем umqtt_xakep_client
.
Убедимся, что все настроено правильно, попытавшись включить светодиод. Для этого нам нужно записать в топик с именем command
строчку enable_led
. Можно воспользоваться любым MQTT-клиентом: на телефоне, на компьютере или в интернете. Я выбрал мобильное приложение EasyMQTT, и в моем инстасе появилось новое устройство. На экране можно отследить перемещение пакета (маленькой черной точки).
Если у тебя все работает, то светодиод будет послушно включаться и выключаться по команде. Осталось настроить автоматическое подключение к Wi-Fi и MQTT при включении девайса. Для этого необходимо добавить вызовы наших функций в файл boot.py
, который выполняется при запуске устройства.
TELEGRAM-БОТ
Что должен делать наш бот? Принимать команду от пользователя — причем желательно в человеческом формате — и отправлять ее на наш MQTT-брокер, а затем дожидаться фотографии с устройства и передавать ее пользователю.
Программа будет состоять из двух частей: непосредственно телеграм?бота, реализующего взаимодействие с пользователем, и «MQTT-клиента», который будет отправлять команды на наш гаджет и принимать фотографии. Для начала напишем именно его с использованием библиотеки pahomqtt
.
Вспомним, что нужно было сделать для настройки «MQTT-клиента» на ESP:
- Настроить подключение к серверу.
- Настроить обработчик событий.
Здесь все то же самое, только немного отличается синтаксис. Для начала создадим файл config.py
, где будет храниться вся конфигурация:
MQTT_HOST = "xakep.cloud.shiftr.io"
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60
MQTT_LOGIN = "xakep"
MQTT_PASSWORD = "utBmi6LTWS7b4l5"
Можно было создать отдельный токен, но мы воспользовались существующим. Импортируем библиотеки:
import paho.mqtt.client as mqtt
from config import MQTT_LOGIN, MQTT_KEEPALIVE, MQTT_PORT, MQTT_HOST, MQTT_PASSWORD
Создаем функции — обработчики событий: подключения и нового сообщения.
def on_connect(client, userdata, flags, rc):
client.subscribe("image")
def on_message(client, userdata, msg):
if msg.topic == "image":
if msg.payload != b'None':
with open(f"images/photo.jpeg", "wb") as file:
file.write(msg.payload)
После подключения наша программа подписывается на канал image
и ждет сообщений. А когда дожидается, проверяет, что это корректное сообщение, и сохраняет его как картинку.
Теперь осталось только подключиться к серверу и привязать наши функции к соответствующим событиям:
client = mqtt.Client(client_id="TelegramBot", clean_session=True)
client.username_pw_set(MQTT_LOGIN, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)
Теперь займемся непосредственно ботом. Для реализации логики ботов на Python существует множество библиотек. Например, в статье «Питоном по телеграму! Пишем пять простых Telegram-ботов на Python» использовалась telebot
, но я выбрал асинхронную библиотеку aiogram
.
Чтобы взаимодействовать с Telegram, необходимо зарегистрировать бот. О том, как это сделать, написано много раз. Так что не буду повторяться, скажу лишь, что TELEGRAM_TOKEN
, полученный от BotFather, я тоже добавил в файл конфигурации.
Создадим шаблон для нашего бота, чтобы отвечать на все фразы сообщением о том, что этот бот умеет.
from aiogram import Bot, Dispatcher, executor
from aiogram.types import Message, KeyboardButton, ReplyKeyboardMarkup
from config import TELEGRAM_TOKEN
# Создаем красивую клавиатуру с кнопками
keyboard = ReplyKeyboardMarkup(resize_keyboard=True)
keyboard.add(
KeyboardButton(text="Включи светодиод"),
KeyboardButton("Выключи светодиод"),
KeyboardButton("Сделай фото")
)
# Объект бота
bot = Bot(token=TELEGRAM_TOKEN)
# Диспетчер для бота
dp = Dispatcher(bot)
# Обработчик всех сообщений
@dp.message_handler()
async def any_text_message(message: Message):
await message.answer(
"Привет! Я умею включать и выключать светодиод и делать фотографии. Нажми на нужную кнопку!",
reply_markup=keyboard
)
if __name__ == "__main__":
# Запуск бота
executor.start_polling(dp, skip_updates=True)
Если теперь ты выполнишь программу и попробуешь запустить бота на телефоне, то увидишь такую картину.
Теперь необходимо добавить работу нашего MQTT-клиента и отправку команд. Чтобы одновременно работал и бот, и MQTT-клиент, мы запустим клиент в параллельном потоке.
import threading
tr = threading.Thread(target=client.loop_forever)
tr.start()
А теперь напишем обработчики для команд. С включением и выключением светодиода все просто — надо отправить соответствующую команду в нужный топик и ответить пользователю, не забыв передать используемую клавиатуру.
@dp.message_handler(lambda message: message.text == "Включи светодиод")
async def enable_led(message: Message):
client.publish("command", "enable_led")
await message.answer("Команда на включение светодиода отправлена", reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Выключи светодиод")
async def disable_led(message: Message):
client.publish("command", "disable_led")
await message.answer("Команда на выключение светодиода отправлена", reply_markup=keyboard)
Но что делать, когда мы хотим снять фотографию? Ведь нам нужно получить ответ от камеры. Воспользуемся небольшим лайфхаком.
Построим алгоритм так: отправляем команду, ждем какое?то время, если за это время наш MQTT-клиент не скачал фотографию, то считаем, что произошла ошибка, и уведомляем об этом пользователя. Время опять же задается в конфигурационном файле — в переменной TIME_WAIT_PHOTO
.
from asyncio import sleep
@dp.message_handler(lambda message: message.text == "Сделай фото")
async def take_photo(message: Message):
client.publish("image", "None")
client.publish("command", "take_screenshot")
start = time.time()
while not os.listdir("images"):
if time.time() - start < TIME_WAIT_PHOTO:
await sleep(1)
else:
await message.answer("Что-то пошло не так — повторите попозже!", reply_markup=keyboard)
return
with open("images/photo.jpeg", "rb") as new_image:
await message.answer_photo(photo=new_image, reply_markup=keyboard)
os.remove("images/photo.jpeg")
Давай теперь запустим все вместе и посмотрим, как оно работает.
Наш девайс готов!
Не забудь подключить к устройству постоянное питание — и можешь наблюдать за своими домашними питомцами или 3D-принтером.