Add new testing suite with working fixture

This commit is contained in:
2025-06-25 09:28:09 +00:00
parent 74a57700c8
commit 81bae580f9
16 changed files with 62 additions and 57 deletions
+20 -11
View File
@@ -11,14 +11,20 @@ modules: dict[str, Any] = {
]
}
TORTOISE_ORM = {
TEST_TORTOISE_ORM = {
"connections": {
"testing": {
"engine": "tortoise.backends.sqlite",
"credentials": {
"file_path": "stoneedge.sqlite"
}
"default": "sqlite://:memory:"
},
"apps": {
"models": {
"models": modules.get("models", []) + ["aerich.models"],
"default_connection": "default",
},
},
}
PROD_TORTOISE_ORM = {
"connections": {
"default": {
"engine": "tortoise.backends.asyncpg",
"credentials": {
@@ -28,23 +34,26 @@ TORTOISE_ORM = {
"password": settings.PSQL_PASSWORD,
"port": settings.PSQL_PORT,
},
}
},
},
"apps": {
"models": {
"models": modules.get("models", []) + ["aerich.models"],
"default_connection": "testing" if settings.IS_TESTING else "default",
"default_connection": "default",
},
},
}
async def migrate_db(tortoise_config=TORTOISE_ORM):
async def migrate_db(tortoise_config=PROD_TORTOISE_ORM):
if settings.IS_TESTING:
tortoise_config=TEST_TORTOISE_ORM
aerich = Command(tortoise_config)
await aerich.init()
await aerich.upgrade(run_in_transaction=True)
await aerich.upgrade()
await Tortoise.init(tortoise_config)
await Tortoise.generate_schemas(safe=True)
async def end_connections_to_db():
await Tortoise.close_connections()
await Tortoise.close_connections()
+3 -1
View File
@@ -31,7 +31,9 @@ async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
Logs the user into our API, creates tokens and passes them back to User.
"""
user: User | None = await User.filter(email=form.username).first()
user: User | None = await User.filter(Q(email=form.username) | Q(username=form.username)).first()
print(user)
if user is None:
raise HTTPException(status_code=401, detail=account_error)
+1 -2
View File
@@ -1,5 +1,4 @@
import pytest
@pytest.mark.usefixtures("use_database_during_testing")
class Test():
pass
+1 -11
View File
@@ -1,11 +1,8 @@
import asyncio
import asyncio, httpx, pytest
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import httpx, pytest
from tortoise import Tortoise
from asgi_lifespan import LifespanManager
from database import migrate_db
from tests.fixtures.account import *
try:
@@ -32,13 +29,6 @@ def event_loop():
loop.close()
@pytest.fixture
async def use_database_during_testing():
await migrate_db()
yield
await Tortoise._drop_databases()
@asynccontextmanager
async def client_manager(app, base_url="https://localhost", **kw) -> ClientManagerType:
app.state.testing = True
View File
+33 -23
View File
@@ -5,6 +5,7 @@ from modules.auth.utils import create_jwt_tokens
from modules.organizations.models import Organization, OrganizationType
from modules.users.models import ACL, Membership, User
from modules.auth.models import Token
from tortoise.expressions import Q
from config import settings
@@ -27,32 +28,41 @@ async def create_user_with_org():
organization_name="simple organization",
organization_type=OrganizationType.HOME,
is_admin=False) -> user_creation_return_type:
org: Organization = await Organization.create(
name=organization_name,
type=organization_type
)
org: Organization | None = await Organization.filter(Q(name=organization_name) & Q(name=organization_type)).first()
if not org:
org: Organization = await Organization.create(
name=organization_name,
type=organization_type
)
acl: ACL = await ACL.create(
READ=True,
WRITE=True,
REPORT=True,
MANAGE=True if is_admin else False,
ADMIN=True if is_admin else False,
)
user: User | None = await User.filter(Q(email=email)).first()
if not user:
user: User = await User.create(
email=email,
username=username,
name=name,
surname=surname,
password=crypt.hash(password),
)
user: User = await User.create(
email=email,
username=username,
name=name,
surname=surname,
password=crypt.hash(password),
)
acl: ACL | None = await ACL.filter(Q(id="5f33facd-08dd-48a1-8f15-3b24f2a727f5")).first()
if not acl:
acl: ACL = await ACL.create(
id="5f33facd-08dd-48a1-8f15-3b24f2a727f5",
READ=True,
WRITE=True,
REPORT=True,
MANAGE=True if is_admin else False,
ADMIN=True if is_admin else False,
)
await Membership.create(
organization=org,
user=user,
acl=acl
)
membership: Membership | None = await Membership.filter(Q(user=user, organization=org, acl=acl)).first()
if not membership:
await Membership.get_or_create(
organization=org,
user=user,
acl=acl
)
tokens: Token = await create_jwt_tokens(user=user)
@@ -10,7 +10,6 @@ crypt = settings.CRYPT
class TestAuthentication(Test):
@pytest.mark.asyncio
async def test_authentication_with_non_existing_user_and_password(
self, client: AsyncClient
):
@@ -25,7 +24,6 @@ class TestAuthentication(Test):
assert response.status_code == 401
assert response.json() == {"detail": "E-Mail Address or password is incorrect"}
@pytest.mark.asyncio
async def test_authentication_with_existing_user_and_wrong_password(
self, client: AsyncClient, create_user_with_org
):
@@ -41,7 +39,6 @@ class TestAuthentication(Test):
assert response.status_code == 401
assert response.json() == {"detail": "E-Mail Address or password is incorrect"}
@pytest.mark.asyncio
async def test_authentication_with_existing_user_and_password(
self, client: AsyncClient, create_user_with_org
):
@@ -69,19 +66,19 @@ class TestAuthentication(Test):
}
}
@pytest.mark.asyncio
async def test_logging_out_destroys_tokens(
self, client: AsyncClient, create_user_with_org
):
user, _, _, _ = await create_user_with_org(email="user@localhost.com", password="userpassword")
user, _, _, _ = await create_user_with_org(email="superuser@localhost.com", password="superuser")
response = await client.post(
"https://localhost/api/v1/auth/login",
data={
"username": "user@localhost.com",
"password": "userpassword",
"username": "superuser@localhost.com",
"password": "superuser",
"grant_type": "password",
},
)
print(response.json())
assert response.status_code == 200
assert response.json() == {
"jwt": {
@@ -116,7 +113,6 @@ class TestAuthentication(Test):
"detail": "Refresh token not found or something went wrong."
}
@pytest.mark.asyncio
async def test_create_new_tokens_upon_refresh(
self, client: AsyncClient, create_user_with_org
):
@@ -166,7 +162,6 @@ class TestAuthentication(Test):
}
}
@pytest.mark.asyncio
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(