Fix further tests for creating accounts, make sure tests can delete items etc.

This commit is contained in:
2025-03-17 13:47:02 +02:00
parent bf9ecaff73
commit fa321adfb7
6 changed files with 120 additions and 45 deletions
+38 -14
View File
@@ -3,7 +3,6 @@ from typing import Annotated
import uuid import uuid
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from pydantic import EmailStr
import pytz import pytz
from modules.users.utils import get_current_active_user from modules.users.utils import get_current_active_user
from modules.auth.utils import create_jwt_tokens, get_tokens_from_logged_in_user from modules.auth.utils import create_jwt_tokens, get_tokens_from_logged_in_user
@@ -12,17 +11,20 @@ from modules.users.models import User
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status
from tortoise.expressions import Q from tortoise.expressions import Q
from config import settings from config import settings
from modules.users.schemas import user_model
from modules.auth.schemas import register_model
router = APIRouter(prefix="/api/v1/auth", tags=["auth"]) router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
account_error: str = "E-Mail Address or password is incorrect" account_error: str = "E-Mail Address or password is incorrect"
token_error: str = "Refresh token not found or something went wrong." token_error: str = "Refresh token not found or something went wrong."
user_exists: str = "Account failed to create, please contact support."
password_failed: str = "Password validation failed, please try again."
crypt = settings.CRYPT crypt = settings.CRYPT
@router.post("/") @router.post("/login")
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]): async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
""" """
Login Login
@@ -50,7 +52,7 @@ async def logout(user: Annotated[User, Depends(get_current_active_user)]):
""" """
Logout Logout
Logout destroys all tokens for User that are currently active. Logout destroys all tokens for User that are currently active.
""" """
get_all_tokens = await Token.filter(Q(user__id=user.id)) get_all_tokens = await Token.filter(Q(user__id=user.id))
if get_all_tokens is None: if get_all_tokens is None:
@@ -64,13 +66,13 @@ async def logout(user: Annotated[User, Depends(get_current_active_user)]):
@router.post("/refresh") @router.post("/refresh")
async def refresh_login( async def refresh_login(
refresh_token: Annotated[Token | None, Depends(get_tokens_from_logged_in_user)] refresh_token: Annotated[Token | None, Depends(get_tokens_from_logged_in_user)],
): ):
""" """
Refresh Refresh
After ging this route a token that is active and not disabled, we disable ALL other tokens and pass along new tokens. After ging this route a token that is active and not disabled, we disable ALL other tokens and pass along new tokens.
Tokens are alive for about 10 minutes. Refresh tokens are alive for 20 minutes. Tokens are alive for about 10 minutes. Refresh tokens are alive for 20 minutes.
""" """
if refresh_token is None: if refresh_token is None:
raise HTTPException( raise HTTPException(
@@ -96,11 +98,11 @@ async def refresh_login(
) )
get_all_tokens = await Token.filter(Q(user__id=refresh_token.user_id)) get_all_tokens = await Token.filter(Q(user__id=refresh_token.user_id))
for token in get_all_tokens: for token in get_all_tokens:
if token.id != refresh_token.id: if token.id != refresh_token.id:
await token.delete() await token.delete()
tokens = await create_jwt_tokens( tokens = await create_jwt_tokens(
user=await User.filter(Q(id=refresh_token.user_id)).first() user=await User.filter(Q(id=refresh_token.user_id)).first()
) )
@@ -108,10 +110,32 @@ async def refresh_login(
return {"jwt": tokens} return {"jwt": tokens}
@router.post("/register") @router.post("/register", status_code=201, response_model=user_model)
async def register(email: EmailStr, name: str, surname: str, password: str, validate_password: str): async def register(user: register_model):
pass # Prevent existing users from reapplying for our system.
existing_user: User | None = await User.filter(
Q(email=user.email)
& Q(username=user.username)
& Q(name=user.name)
& Q(surname=user.surname)
).get_or_none()
@router.post("/2fa") if existing_user is not None:
async def twofa(): raise HTTPException(
pass status_code=status.HTTP_401_UNAUTHORIZED,
detail=user_exists,
)
if user.password != user.validate_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=password_failed,
)
return await User.create(
email=user.email,
username=user.username,
name=user.name,
surname=user.surname,
password=crypt.hash(user.password),
)
@@ -1,6 +1,14 @@
from pydantic import BaseModel, EmailStr
from tortoise.contrib.pydantic import pydantic_model_creator from tortoise.contrib.pydantic import pydantic_model_creator
from modules.auth.models import Token from modules.auth.models import Token
token_model = pydantic_model_creator(Token) token_model = pydantic_model_creator(Token)
class register_model(BaseModel):
email: EmailStr
username: str
name: str
surname: str
password: str
validate_password: str
@@ -45,6 +45,7 @@ class OrganizationType(Enum):
""" """
HOME: str = "home" # Home use (Any size) HOME: str = "home" # Home use (Any size)
NON_PROFIT: str = "non_profit"
SMALL_ORGANIZATION: str = "s_org" # 1-100 SMALL_ORGANIZATION: str = "s_org" # 1-100
MEDIUM_ORGANIZATION: str = "m_org" # 100 - 500 MEDIUM_ORGANIZATION: str = "m_org" # 100 - 500
LARGE_ORGANIZATION: str = "l_org" # 500 - 1000 LARGE_ORGANIZATION: str = "l_org" # 500 - 1000
@@ -77,9 +78,12 @@ class Organization(Model, CMDMixin):
def __str__(self) -> str: def __str__(self) -> str:
return f"{self.id} - {self.name}" return f"{self.id} - {self.name}"
async def delete(self) -> None: async def delete(self, force: bool = False) -> None:
self.disabled = True if force:
self.disabled_at = datetime.now(tz=pytz.UTC) await Model.delete(self)
await self.save() else:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
+24 -21
View File
@@ -11,6 +11,7 @@ from config import settings
crypt = settings.CRYPT crypt = settings.CRYPT
class User(Model, CMDMixin): class User(Model, CMDMixin):
""" """
User User
@@ -39,30 +40,29 @@ class User(Model, CMDMixin):
def __str__(self) -> str: def __str__(self) -> str:
return f"{self.id} - {self.name} {self.surname}" return f"{self.id} - {self.name} {self.surname}"
def set_password(self, password: str) -> None: async def set_password(self, password: str) -> None:
self.password = crypt.hash( self.password = crypt.hash(password)
password await self.save() # Make sure to save the model in DB
)
self.save() # Make sure to save the model in DB
def check_against_password(self, password: str) -> bool: def check_against_password(self, password: str) -> bool:
return crypt.verify( return crypt.verify(password, self.password)
password,
self.password async def update_password(
) self, old_password, new_password: str, verify_new_password: str
) -> bool:
def update_password(self, old_password, new_password: str, verify_new_password: str) -> bool:
if self.check_against_password(old_password) is False: if self.check_against_password(old_password) is False:
return False return False
if new_password is not verify_new_password: if new_password is not verify_new_password:
return False return False
self.set_password(new_password) await self.set_password(new_password)
async def delete(self) -> None:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
async def delete(self, force: bool = False) -> None:
if force:
await Model.delete(self)
else:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
class ACL(Model): class ACL(Model):
@@ -103,7 +103,10 @@ class Membership(Model, CMDMixin):
acl: ACL = fields.ForeignKeyField("models.ACL") acl: ACL = fields.ForeignKeyField("models.ACL")
disabled: bool = fields.BooleanField(default=False) disabled: bool = fields.BooleanField(default=False)
async def delete(self) -> None: async def delete(self, force: bool = False) -> None:
self.disabled = True if force:
self.disabled_at = datetime.now(tz=pytz.UTC) await Model.delete(self)
await self.save() else:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
@@ -2,4 +2,4 @@ from tortoise.contrib.pydantic import pydantic_model_creator
from modules.users.models import User from modules.users.models import User
UserModel = pydantic_model_creator(User) user_model = pydantic_model_creator(User, exclude=["password"])
@@ -1,7 +1,9 @@
from modules.users.models import User
import pytest # type: ignore import pytest # type: ignore
from httpx import AsyncClient from httpx import AsyncClient
from config import settings from config import settings
from unittest.mock import ANY from unittest.mock import ANY
from tortoise.expressions import Q
crypt = settings.CRYPT crypt = settings.CRYPT
@@ -12,7 +14,7 @@ class TestAuthentication(object):
self, client: AsyncClient self, client: AsyncClient
): ):
response = await client.post( response = await client.post(
"https://localhost/api/v1/auth/", "https://localhost/api/v1/auth/login",
data={ data={
"username": "non-existing@localhost.com", "username": "non-existing@localhost.com",
"password": "password", "password": "password",
@@ -28,7 +30,7 @@ class TestAuthentication(object):
): ):
_, _, _, _ = use_admin_account _, _, _, _ = use_admin_account
response = await client.post( response = await client.post(
"https://localhost/api/v1/auth/", "https://localhost/api/v1/auth/login",
data={ data={
"username": "admin@localhost.com", "username": "admin@localhost.com",
"password": "password", "password": "password",
@@ -44,7 +46,7 @@ class TestAuthentication(object):
): ):
_, _, admin, _ = use_admin_account _, _, admin, _ = use_admin_account
response = await client.post( response = await client.post(
"https://localhost/api/v1/auth/", "https://localhost/api/v1/auth/login",
data={ data={
"username": "admin@localhost.com", "username": "admin@localhost.com",
"password": "adminpassword", "password": "adminpassword",
@@ -72,7 +74,7 @@ class TestAuthentication(object):
): ):
_, _, user, _ = use_user_account _, _, user, _ = use_user_account
response = await client.post( response = await client.post(
"https://localhost/api/v1/auth/", "https://localhost/api/v1/auth/login",
data={ data={
"username": "user@localhost.com", "username": "user@localhost.com",
"password": "userpassword", "password": "userpassword",
@@ -119,7 +121,7 @@ class TestAuthentication(object):
): ):
_, _, admin, _ = use_admin_account _, _, admin, _ = use_admin_account
token = await client.post( token = await client.post(
"https://localhost/api/v1/auth/", "https://localhost/api/v1/auth/login",
data={ data={
"username": "admin@localhost.com", "username": "admin@localhost.com",
"password": "adminpassword", "password": "adminpassword",
@@ -162,3 +164,37 @@ class TestAuthentication(object):
"token_type": "Bearer", "token_type": "Bearer",
} }
} }
@pytest.mark.asyncio
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(
Q(email="superuser@localhost.com")
).get_or_none()
if check_if_account_exists:
await check_if_account_exists.delete(force=True)
account = await client.post(
"https://localhost/api/v1/auth/register",
json={
"email": "superuser@localhost.com",
"username": "superuser",
"name": "awesome",
"surname": "superuser",
"password": "superuserpassword",
"validate_password": "superuserpassword",
},
)
assert account.status_code == 201
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "superuser@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "superuser",
"username": "superuser",
}