Пишем веб сервис на Python с помощью FastAPI

МЕНЮ


Искусственный интеллект
Поиск
Регистрация на сайте
Помощь проекту

ТЕМЫ


Новости ИИРазработка ИИВнедрение ИИРабота разума и сознаниеМодель мозгаРобототехника, БПЛАТрансгуманизмОбработка текстаТеория эволюцииДополненная реальностьЖелезоКиберугрозыНаучный мирИТ индустрияРазработка ПОТеория информацииМатематикаЦифровая экономика

Авторизация



RSS


RSS новости


Знаю, знаю, наверное вы сейчас думаете «что, опять?!».
Да, на Хабре уже неоднократно писали о фреймворке FastAPI. Но я предлагаю рассмотреть этот инструмент немного подробнее и написать API своего собственного мини Хабра без кармы и рейтингов, зато с блэкджеком и с тестами, аутентификацией, миграциями и асинхронной работой с БД.

Схема базы данных и миграции

Прежде всего, с помощью SQLAlchemy Expression Language, опишем схему базы данных. Создадим файл models/users.py:

import sqlalchemy from sqlalchemy.dialects.postgresql import UUID  metadata = sqlalchemy.MetaData()   users_table = sqlalchemy.Table(     "users",     metadata,     sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),     sqlalchemy.Column("email", sqlalchemy.String(40), unique=True, index=True),     sqlalchemy.Column("name", sqlalchemy.String(100)),     sqlalchemy.Column("hashed_password", sqlalchemy.String()),     sqlalchemy.Column(         "is_active",         sqlalchemy.Boolean(),         server_default=sqlalchemy.sql.expression.true(),         nullable=False,     ), )   tokens_table = sqlalchemy.Table(     "tokens",     metadata,     sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),     sqlalchemy.Column(         "token",         UUID(as_uuid=False),         server_default=sqlalchemy.text("uuid_generate_v4()"),         unique=True,         nullable=False,         index=True,     ),     sqlalchemy.Column("expires", sqlalchemy.DateTime()),     sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")), ) 

И файл models/posts.py:

import sqlalchemy  from .users import users_table  metadata = sqlalchemy.MetaData()   posts_table = sqlalchemy.Table(     "posts",     metadata,     sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),     sqlalchemy.Column("user_id", sqlalchemy.ForeignKey(users_table.c.id)),     sqlalchemy.Column("created_at", sqlalchemy.DateTime()),     sqlalchemy.Column("title", sqlalchemy.String(100)),     sqlalchemy.Column("content", sqlalchemy.Text()), ) 

Чтобы автоматизировать миграции базы данных, установим alembic:
$ pip install alembic

Для инициализации Alembic выполним:

$ alembic init migrations

Эта команда создаст в текущей директории файл alembic.ini и каталог migrations содержащий:

  • каталог versions, в котором будут хранится файлы миграций
  • скрипт env.py, запускающийся при вызове alembic
  • файл script.py.mako, содержащий шаблон для новых миграций.

Укажем url нашей базы данных, для этого в файле alembic.ini добавим строчку:

sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s:5432/%(DB_NAME)s

Формат %(variable_name)s позволяет нам устанавливать разные значения переменных в зависимости от среды окружения, переопределяя их в файле env.py например вот так:

from os import environ from alembic import context from app.models import posts, users  # Alembic Config объект предоставляет доступ # к переменным из файла alembic.ini config = context.config  section = config.config_ini_section config.set_section_option(section, "DB_USER", environ.get("DB_USER")) config.set_section_option(section, "DB_PASS", environ.get("DB_PASS")) config.set_section_option(section, "DB_NAME", environ.get("DB_NAME")) config.set_section_option(section, "DB_HOST", environ.get("DB_HOST"))  fileConfig(config.config_file_name)  target_metadata = [users.metadata, posts.metadata] 

Здесь мы берем значения DB_USER, DB_PASS, DB_NAME и DB_HOST из переменных окружения. Кроме этого, в файле env.py указываются метаданные нашей базы в атрибуте target_metadata, без этого Alembic не сможет определить какие изменения необходимо произвести в базе данных.

Все готово и мы можем сгенерировать миграции и обновить БД:

 $ alembic revision --autogenerate -m "Added required tables" $ alembic upgrade head 

Запускаем приложение и подключаем БД

Создадим файл main.py:

from fastapi import FastAPI  app = FastAPI()  @app.get("/") def read_root():     return {"Hello": "World"} 

И запустим приложение, выполнив команду:

$ uvicorn main:app --reload

Убедимся, что все работает как надо. Открываем в браузере http://127.0.0.1:8000/ и видим
{"Hello": "World"}

Чтобы подключиться к базе данных, воспользуемся модулем databases, который позволяет выполнять запросы асинхронно. Настроим startup и shutdhown события нашего сервиса, при которых будут происходить подключение и отключение от базы данных. Отредактируем файл main.py:

from os import environ  import databases  # берем параметры БД из переменных окружения DB_USER = environ.get("DB_USER", "user") DB_PASSWORD = environ.get("DB_PASSWORD", "password") DB_HOST = environ.get("DB_HOST", "localhost") DB_NAME = "async-blogs" SQLALCHEMY_DATABASE_URL = (     f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}" ) # создаем объект database, который будет использоваться для выполнения запросов database = databases.Database(SQLALCHEMY_DATABASE_URL)   app = FastAPI()   @app.on_event("startup") async def startup():     # когда приложение запускается устанавливаем соединение с БД     await database.connect()   @app.on_event("shutdown") async def shutdown():     # когда приложение останавливается разрываем соединение с БД     await database.disconnect()   @app.get("/") async def read_root():     # изменим роут таким образом, чтобы он брал данные из БД     query = (         select(             [                 posts_table.c.id,                 posts_table.c.created_at,                 posts_table.c.title,                 posts_table.c.content,                 posts_table.c.user_id,                 users_table.c.name.label("user_name"),             ]         )         .select_from(posts_table.join(users_table))         .order_by(desc(posts_table.c.created_at))     )     return await database.fetch_all(query) 

Открываем http://127.0.0.1:8000/ и если видим в ответе пустой список [], значит все прошло хорошо и можно двигаться дальше.

Валидация запроса и ответа

Реализуем возможность регистрации пользователей. Для этого нам понадобиться валидировать HTTP запросы и ответы. Для решения этой задачи воспользуемся библиотекой pydantic:

pip install pydantic

Создадим файл schemas/users.py и добавим модель, отвечающую за валидацию тела запроса:

from pydantic import BaseModel, EmailStr  class UserCreate(BaseModel):     """ Проверяет sign-up запрос """     email: EmailStr     name: str     password: str 

Обратите внимание, что типы полей определяются с помощью аннотации типов. Помимо встроенных типов данных, таких как int и str, pydantic предлагает большое количество типов, обеспечивающих дополнительную проверку. Например, тип EmailStr проверяет, что полученное значение — корректный email. Для использования типа EmailStr необходимо установить модуль email-validator:

pip install email-validator

Тело ответа должно содержать свои собственные специфические поля, например id и access_token, поэтому добавим в файл schemas/users.py модели, отвечающие за формирование ответа:

from typing import Optional from pydantic import UUID4, BaseModel, EmailStr, Field, validator   class UserCreate(BaseModel):     """ Проверяет sign-up запрос """     email: EmailStr     name: str     password: str   class UserBase(BaseModel):     """ Формирует тело ответа с деталями пользователя """     id: int     email: EmailStr     name: str   class TokenBase(BaseModel):     token: UUID4 = Field(..., alias="access_token")     expires: datetime     token_type: Optional[str] = "bearer"      class Config:         allow_population_by_field_name = True      @validator("token")     def hexlify_token(cls, value):         """ Конвертирует UUID в hex строку """         return value.hex   class User(UserBase):     """ Формирует тело ответа с деталями пользователя и токеном """     token: TokenBase = {} 

Для каждого поля модели можно написать кастомный валидатор. Например, hexlify_token преобразует UUID значение в hex строку. Стоит отметить, что вы можете использовать класс Field, когда нужно переопределить стандартное поведение поля модели. Например, token: UUID4 = Field(..., alias=«access_token») устанавливает псевдоним access_token для поля token. Для обозначения, что поле обязательно, в качестве первого параметра передается специальное значение — ... (ellipsis). Добавим файл utils/users.py, в котором создадим методы, необходимые для записи пользователя в БД:

import hashlib import random import string from datetime import datetime, timedelta from sqlalchemy import and_  from app.models.database import database from app.models.users import tokens_table, users_table from app.schemas import users as user_schema  def get_random_string(length=12):     """ Генерирует случайную строку, использующуюся как соль """     return "".join(random.choice(string.ascii_letters) for _ in range(length))   def hash_password(password: str, salt: str = None):     """ Хеширует пароль с солью """     if salt is None:         salt = get_random_string()     enc = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 100_000)     return enc.hex()   def validate_password(password: str, hashed_password: str):     """ Проверяет, что хеш пароля совпадает с хешем из БД """     salt, hashed = hashed_password.split("$")     return hash_password(password, salt) == hashed   async def get_user_by_email(email: str):     """ Возвращает информацию о пользователе """     query = users_table.select().where(users_table.c.email == email)     return await database.fetch_one(query)   async def get_user_by_token(token: str):     """ Возвращает информацию о владельце указанного токена """     query = tokens_table.join(users_table).select().where(         and_(             tokens_table.c.token == token,             tokens_table.c.expires > datetime.now()         )     )     return await database.fetch_one(query)   async def create_user_token(user_id: int):     """ Создает токен для пользователя с указанным user_id """     query = (         tokens_table.insert()         .values(expires=datetime.now() + timedelta(weeks=2), user_id=user_id)         .returning(tokens_table.c.token, tokens_table.c.expires)     )      return await database.fetch_one(query)   async def create_user(user: user_schema.UserCreate):     """ Создает нового пользователя в БД """     salt = get_random_string()     hashed_password = hash_password(user.password, salt)     query = users_table.insert().values(         email=user.email, name=user.name, hashed_password=f"{salt}${hashed_password}"     )     user_id = await database.execute(query)     token = await create_user_token(user_id)     token_dict = {"token": token["token"], "expires": token["expires"]}      return {**user.dict(), "id": user_id, "is_active": True, "token": token_dict} 


Создадим файл routers/users.py и добавим sign-up роут, указав, что в запросе он ожидает модель CreateUser и возвращает модель User:
from fastapi import APIRouter from app.schemas import users from app.utils import users as users_utils   router = APIRouter()   @router.post("/sign-up", response_model=users.User) async def create_user(user: users.UserCreate):     db_user = await users_utils.get_user_by_email(email=user.email)     if db_user:         raise HTTPException(status_code=400, detail="Email already registered")     return await users_utils.create_user(user=user) 

Осталось только подключить роуты из файла routers/users.py. Для этого добавим в main.py следующие строки:

from app.routers import users app.include_router(users.router) 

Аутентификация и контроль доступа

Теперь, когда в нашей базе данных есть пользователи, все готово для того чтобы настроить аутентификацию приложения. Добавим эндпоинт, который принимает имя пользователя и пароль и возвращает токен. Обновим файл routers/users.py, добавив в него:

from fastapi import Depends from fastapi.security import OAuth2PasswordRequestForm   @router.post("/auth", response_model=users.TokenBase) async def auth(form_data: OAuth2PasswordRequestForm = Depends()):     user = await users_utils.get_user_by_email(email=form_data.username)      if not user:         raise HTTPException(status_code=400, detail="Incorrect email or password")      if not users_utils.validate_password(         password=form_data.password, hashed_password=user["hashed_password"]     ):         raise HTTPException(status_code=400, detail="Incorrect email or password")      return await users_utils.create_user_token(user_id=user["id"]) 

При этом, нам не нужно самостоятельно описывать модель запроса, Fastapi предоставляет специальный dependency класс OAuth2PasswordRequestForm, который заставляет роут ожидать два поля username и password.

Чтобы ограничить доступ к определенным роутам для неаутентифицированных пользователей, напишем метод-зависимость(dependency). Он проверит, что предоставленный токен принадлежит активному пользователю и вернет данные пользователя. Это позволит нам использовать информацию о пользователе во всех роутах, требующих аутентификации. Создадим файл utils/dependecies.py:

from app.utils import users as users_utils from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer   oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth")   async def get_current_user(token: str = Depends(oauth2_scheme)):     user = await users_utils.get_user_by_token(token)     if not user:         raise HTTPException(             status_code=status.HTTP_401_UNAUTHORIZED,             detail="Invalid authentication credentials",             headers={"WWW-Authenticate": "Bearer"},         )     if not user["is_active"]:         raise HTTPException(             status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user"         )     return user 

Обратите внимание, что зависимость может в свою очередь зависеть от другой зависимости. К пример OAuth2PasswordBearer — зависимость, которая дает понять FastAPI, что текущий роут требует аутентификации.

Чтобы проверить, что все работает как надо, добавим роут /users/me, возвращающий детали текущего пользователя. В файл routers/users.py добавим строки:

from app.utils.dependencies import get_current_user   @router.get("/users/me", response_model=users.UserBase) async def read_users_me(current_user: users.User = Depends(get_current_user)):     return current_user 

Теперь у нас есть роут /users/me к которому имеют доступ только аутентифицированные пользователи.

Все готово для того, чтобы наконец добавить возможность пользователям создавать и редактировать публикации:

utils/posts.py

from datetime import datetime  from app.models.database import database from app.models.posts import posts_table from app.models.users import users_table from app.schemas import posts as post_schema from sqlalchemy import desc, func, select   async def create_post(post: post_schema.PostModel, user):     query = (         posts_table.insert()         .values(             title=post.title,             content=post.content,             created_at=datetime.now(),             user_id=user["id"],         )         .returning(             posts_table.c.id,             posts_table.c.title,             posts_table.c.content,             posts_table.c.created_at,         )     )     post = await database.fetch_one(query)      # Convert to dict and add user_name key to it     post = dict(zip(post, post.values()))     post["user_name"] = user["name"]     return post   async def get_post(post_id: int):     query = (         select(             [                 posts_table.c.id,                 posts_table.c.created_at,                 posts_table.c.title,                 posts_table.c.content,                 posts_table.c.user_id,                 users_table.c.name.label("user_name"),             ]         )         .select_from(posts_table.join(users_table))         .where(posts_table.c.id == post_id)     )     return await database.fetch_one(query)   async def get_posts(page: int):     max_per_page = 10     offset1 = (page - 1) * max_per_page     query = (         select(             [                 posts_table.c.id,                 posts_table.c.created_at,                 posts_table.c.title,                 posts_table.c.content,                 posts_table.c.user_id,                 users_table.c.name.label("user_name"),             ]         )         .select_from(posts_table.join(users_table))         .order_by(desc(posts_table.c.created_at))         .limit(max_per_page)         .offset(offset1)     )     return await database.fetch_all(query)   async def get_posts_count():     query = select([func.count()]).select_from(posts_table)     return await database.fetch_val(query)   async def update_post(post_id: int, post: post_schema.PostModel):     query = (         posts_table.update()         .where(posts_table.c.id == post_id)         .values(title=post.title, content=post.content)     )     return await database.execute(query)  

routers/posts.py

from app.schemas.posts import PostDetailsModel, PostModel from app.schemas.users import User from app.utils import posts as post_utils from app.utils.dependencies import get_current_user from fastapi import APIRouter, Depends, HTTPException, status  router = APIRouter()   @router.post("/posts", response_model=PostDetailsModel, status_code=201) async def create_post(post: PostModel, current_user: User = Depends(get_current_user)):     post = await post_utils.create_post(post, current_user)     return post   @router.get("/posts") async def get_posts(page: int = 1):     total_cout = await post_utils.get_posts_count()     posts = await post_utils.get_posts(page)     return {"total_count": total_cout, "results": posts}   @router.get("/posts/{post_id}", response_model=PostDetailsModel) async def get_post(post_id: int):     return await post_utils.get_post(post_id)   @router.put("/posts/{post_id}", response_model=PostDetailsModel) async def update_post(     post_id: int, post_data: PostModel, current_user=Depends(get_current_user) ):     post = await post_utils.get_post(post_id)     if post["user_id"] != current_user["id"]:         raise HTTPException(             status_code=status.HTTP_403_FORBIDDEN,             detail="You don't have access to modify this post",         )      await post_utils.update_post(post_id=post_id, post=post_data)     return await post_utils.get_post(post_id)  

Подключим новые роуты, добавив в main.py
from app.routers import posts app.include_router(posts.router) 

Тестирование

Тесты мы будем писать на pytest:

$ pip install pytest

Для тестирования эндпоинтов FastAPI предоставляет специальный инструмент TestClient. Напишем тест для эндпоинта, который не требует подключения к базе данных:
from app.main import app from fastapi.testclient import TestClient  client = TestClient(app)   def test_health_check():     response = client.get("/")     assert response.status_code == 200     assert response.json() == {"Hello": "World"} 

Как видите, все достаточно просто. Необходимо инициализировать TestClient, и использовать его для тестирования HTTP запросов.

Для тестирования остальных эндпоинтов, необходимо создать тестовую БД. Отредактируем файл main.py, добавив в него конфигурацию тестовой базы:

from os import environ  import databases  DB_USER = environ.get("DB_USER", "user") DB_PASSWORD = environ.get("DB_PASSWORD", "password") DB_HOST = environ.get("DB_HOST", "localhost")  TESTING = environ.get("TESTING")  if TESTING:     # Используем отдельную базу данных для тестов     DB_NAME = "async-blogs-temp-for-test"     TEST_SQLALCHEMY_DATABASE_URL = (         f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"     )     database = databases.Database(TEST_SQLALCHEMY_DATABASE_URL) else:     DB_NAME = "async-blogs"     SQLALCHEMY_DATABASE_URL = (         f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}"     )     database = databases.Database(SQLALCHEMY_DATABASE_URL) 

Мы по-прежнему используем БД «async-blogs» для нашего приложения. Но если задано значение переменной окружение TESTING, тогда использовуется БД «async-blogs-temp-for-test».

Чтобы база «async-blogs-temp-for-test» автоматически создавалась при запуске тестов и удалялась после их выполнения, создадим фикстуру в файле tests/conftest.py:

import os  import pytest  # Устанавливаем `os.environ`, чтобы использовать тестовую БД os.environ['TESTING'] = 'True'  from alembic import command from alembic.config import Config from app.models import database from sqlalchemy_utils import create_database, drop_database   @pytest.fixture(scope="module") def temp_db():     create_database(database.TEST_SQLALCHEMY_DATABASE_URL) # Создаем БД     base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))     alembic_cfg = Config(os.path.join(base_dir, "alembic.ini")) # Загружаем конфигурацию alembic      command.upgrade(alembic_cfg, "head") # выполняем миграции      try:         yield database.TEST_SQLALCHEMY_DATABASE_URL     finally:         drop_database(database.TEST_SQLALCHEMY_DATABASE_URL) # удаляем БД 

Для создания и удаления БД воспользуемся библиотекой sqlalchemy_utils . Используя фикстуру temp_db в тестах, мы сможем протестировать все эндпоинты нашего приложения:

def test_sign_up(temp_db):     request_data = {         "email": "vader@deathstar.com",         "name": "Darth Vader",         "password": "rainbow"     }     with TestClient(app) as client:         response = client.post("/sign-up", json=request_data)     assert response.status_code == 200     assert response.json()["id"] == 1     assert response.json()["email"] == "vader@deathstar.com"     assert response.json()["name"] == "Darth"     assert response.json()["token"]["expires"] is not None     assert response.json()["token"]["access_token"] is not None 

tests/test_posts.py

import asyncio  from app.main import app from app.schemas.users import UserCreate from app.utils.users import create_user, create_user_token from fastapi.testclient import TestClient   def test_create_post(temp_db):     user = UserCreate(         email="vader@deathstar.com",         name="Darth",         password="rainbow"     )     request_data = {       "title": "42",       "content": "Don't panic!"     }     with TestClient(app) as client:         # Create user and use his token to add new post         loop = asyncio.get_event_loop()         user_db = loop.run_until_complete(create_user(user))         response = client.post(             "/posts",             json=request_data,             headers={"Authorization": f"Bearer {user_db['token']['token']}"}         )     assert response.status_code == 201     assert response.json()["id"] == 1     assert response.json()["title"] == "42"     assert response.json()["content"] == "Don't panic!"   def test_create_post_forbidden_without_token(temp_db):     request_data = {       "title": "42",       "content": "Don't panic!"     }     with TestClient(app) as client:         response = client.post("/posts", json=request_data)     assert response.status_code == 401   def test_posts_list(temp_db):     with TestClient(app) as client:         response = client.get("/posts")     assert response.status_code == 200     assert response.json()["total_count"] == 1     assert response.json()["results"][0]["id"] == 1     assert response.json()["results"][0]["title"] == "42"     assert response.json()["results"][0]["content"] == "Don't panic!"   def test_post_detail(temp_db):     post_id = 1     with TestClient(app) as client:         response = client.get(f"/posts/{post_id}")     assert response.status_code == 200     assert response.json()["id"] == 1     assert response.json()["title"] == "42"     assert response.json()["content"] == "Don't panic!"   def test_update_post(temp_db):     post_id = 1     request_data = {       "title": "42",       "content": "Life? Don't talk to me about life."     }     with TestClient(app) as client:         # Create user token to add new post         loop = asyncio.get_event_loop()         token = loop.run_until_complete(create_user_token(user_id=1))         response = client.put(             f"/posts/{post_id}",             json=request_data,             headers={"Authorization": f"Bearer {token['token']}"}         )     assert response.status_code == 200     assert response.json()["id"] == 1     assert response.json()["title"] == "42"     assert response.json()["content"] == "Life? Don't talk to me about life."   def test_update_post_forbidden_without_token(temp_db):     post_id = 1     request_data = {       "title": "42",       "content": "Life? Don't talk to me about life."     }     with TestClient(app) as client:         response = client.put(f"/posts/{post_id}", json=request_data)     assert response.status_code == 401 

tests/test_users.py

import asyncio import pytest  from app.main import app from app.schemas.users import UserCreate from app.utils.users import create_user, create_user_token from fastapi.testclient import TestClient   def test_sign_up(temp_db):     request_data = {         "email": "vader@deathstar.com",         "name": "Darth",         "password": "rainbow"     }     with TestClient(app) as client:         response = client.post("/sign-up", json=request_data)     assert response.status_code == 200     assert response.json()["id"] == 1     assert response.json()["email"] == "vader@deathstar.com"     assert response.json()["name"] == "Darth"     assert response.json()["token"]["expires"] is not None     assert response.json()["token"]["token"] is not None   def test_login(temp_db):     request_data = {"username": "vader@deathstar.com", "password": "rainbow"}     with TestClient(app) as client:         response = client.post("/auth", data=request_data)     assert response.status_code == 200     assert response.json()["token_type"] == "bearer"     assert response.json()["expires"] is not None     assert response.json()["access_token"] is not None   def test_login_with_invalid_password(temp_db):     request_data = {"username": "vader@deathstar.com", "password": "unicorn"}     with TestClient(app) as client:         response = client.post("/auth", data=request_data)     assert response.status_code == 400     assert response.json()["detail"] == "Incorrect email or password"   def test_user_detail(temp_db):     with TestClient(app) as client:         # Create user token to see user info         loop = asyncio.get_event_loop()         token = loop.run_until_complete(create_user_token(user_id=1))         response = client.get(             "/users/me",             headers={"Authorization": f"Bearer {token['token']}"}         )     assert response.status_code == 200     assert response.json()["id"] == 1     assert response.json()["email"] == "vader@deathstar.com"     assert response.json()["name"] == "Darth"   def test_user_detail_forbidden_without_token(temp_db):     with TestClient(app) as client:         response = client.get("/users/me")     assert response.status_code == 401   @pytest.mark.freeze_time("2015-10-21") def test_user_detail_forbidden_with_expired_token(temp_db, freezer):     user = UserCreate(         email="sidious@deathstar.com",         name="Palpatine",         password="unicorn"     )     with TestClient(app) as client:         # Create user and use expired token         loop = asyncio.get_event_loop()         user_db = loop.run_until_complete(create_user(user))         freezer.move_to("'2015-11-10'")         response = client.get(             "/users/me",             headers={"Authorization": f"Bearer {user_db['token']['token']}"}         )     assert response.status_code == 401 

P.S. Исходники

Вот собственно и все, репозиторий с исходниками из поста можно посмотреть на GitHub.


Источник: habr.com

Комментарии: