Add user routes #12
@@ -10,8 +10,6 @@ from modules.auth.models import Token
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from tortoise.expressions import Q
|
||||
from config import settings
|
||||
from modules.users.schemas import user_model
|
||||
from modules.auth.schemas import register_model
|
||||
|
||||
router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
|
||||
|
||||
@@ -31,7 +29,7 @@ async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
Logs the user into our API, creates tokens and passes them back to User.
|
||||
"""
|
||||
user: User | None = await User.filter(
|
||||
Q(email=form.username) | Q(username=form.username)
|
||||
(Q(email=form.username) | Q(username=form.username)) & Q(disabled=False)
|
||||
).first()
|
||||
|
||||
if user is None:
|
||||
@@ -44,11 +42,6 @@ async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
|
||||
)
|
||||
|
||||
if user.disabled is True:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
|
||||
)
|
||||
|
||||
tokens = await create_jwt_tokens(user)
|
||||
|
||||
return {"jwt": tokens}
|
||||
@@ -61,7 +54,7 @@ async def logout(user: Annotated[User, Depends(get_current_active_user)]):
|
||||
|
||||
Logout destroys all tokens for User that are currently active.
|
||||
"""
|
||||
get_all_tokens = await Token.filter(Q(user__id=user.id))
|
||||
get_all_tokens = await Token.filter(Q(user__id=user.id) & Q(disabled=False))
|
||||
if get_all_tokens is None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_204_NO_CONTENT, detail="An error occurred."
|
||||
@@ -98,12 +91,6 @@ async def refresh_login(
|
||||
detail=token_error,
|
||||
)
|
||||
|
||||
if refresh_token.disabled is True:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=token_error,
|
||||
)
|
||||
|
||||
get_all_tokens = await Token.filter(Q(user__id=refresh_token.user_id))
|
||||
|
||||
for token in get_all_tokens:
|
||||
@@ -115,36 +102,3 @@ async def refresh_login(
|
||||
)
|
||||
|
||||
return {"jwt": tokens}
|
||||
|
||||
|
||||
@router.post(
|
||||
"/register", status_code=status.HTTP_201_CREATED, response_model=user_model
|
||||
)
|
||||
async def register(user: register_model):
|
||||
# Prevent existing users from reapplying for our system.
|
||||
existing_user: User | None = await User.filter(
|
||||
Q(email=user.email)
|
||||
& Q(username=user.username)
|
||||
& Q(name=user.name)
|
||||
& Q(surname=user.surname)
|
||||
).get_or_none()
|
||||
|
||||
if existing_user is not None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=user_exists,
|
||||
)
|
||||
|
||||
if user.password != user.validate_password:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=password_failed,
|
||||
)
|
||||
|
||||
return await User.create(
|
||||
email=user.email,
|
||||
username=user.username,
|
||||
name=user.name,
|
||||
surname=user.surname,
|
||||
password=crypt.hash(user.password),
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ async def create_jwt_tokens(user: User) -> Token:
|
||||
|
||||
|
||||
async def get_tokens_from_logged_in_user(
|
||||
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)]
|
||||
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)],
|
||||
) -> User | None:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
@@ -79,4 +79,6 @@ async def get_tokens_from_logged_in_user(
|
||||
except:
|
||||
raise credentials_exception
|
||||
|
||||
return await Token.filter(Q(refresh_token=token) & Q(user__id=user_id)).first()
|
||||
return await Token.filter(
|
||||
Q(refresh_token=token) & Q(user__id=user_id) & Q(disabled=False)
|
||||
).first()
|
||||
|
||||
@@ -1,20 +1,57 @@
|
||||
from fastapi import APIRouter
|
||||
from typing import Annotated
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from tortoise.expressions import Q
|
||||
|
||||
from modules.users.utils import get_current_active_user
|
||||
from modules.auth.schemas import register_model
|
||||
from modules.users.models import User
|
||||
from modules.users.schemas import user_model
|
||||
|
||||
from config import settings
|
||||
|
||||
|
||||
router = APIRouter(prefix="/api/v1/users", tags=["users"])
|
||||
|
||||
crypt = settings.CRYPT
|
||||
|
||||
@router.get("/")
|
||||
def get_all_users():
|
||||
pass
|
||||
|
||||
@router.post("/")
|
||||
def create_user():
|
||||
pass
|
||||
user_exists: str = "Account failed to create, please contact support."
|
||||
password_failed: str = "Password validation failed, please try again."
|
||||
|
||||
|
||||
@router.get("/me")
|
||||
def get_user():
|
||||
pass
|
||||
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=user_model)
|
||||
async def create_user(user: register_model):
|
||||
# Prevent existing users from reapplying for our system.
|
||||
existing_user: User | None = await User.filter(
|
||||
Q(email=user.email)
|
||||
& Q(username=user.username)
|
||||
& Q(name=user.name)
|
||||
& Q(surname=user.surname)
|
||||
).get_or_none()
|
||||
|
||||
if existing_user is not None:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=user_exists,
|
||||
)
|
||||
|
||||
if user.password != user.validate_password:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=password_failed,
|
||||
)
|
||||
|
||||
return await User.create(
|
||||
email=user.email,
|
||||
username=user.username,
|
||||
name=user.name,
|
||||
surname=user.surname,
|
||||
password=crypt.hash(user.password),
|
||||
)
|
||||
|
||||
|
||||
@router.get("/me", response_model=user_model)
|
||||
async def get_user(user: Annotated[User, Depends(get_current_active_user)]):
|
||||
return user
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
from tests.base_test import Test
|
||||
from modules.users.models import User
|
||||
from httpx import AsyncClient
|
||||
from config import settings
|
||||
from unittest.mock import ANY
|
||||
from tortoise.expressions import Q
|
||||
|
||||
crypt = settings.CRYPT
|
||||
|
||||
@@ -161,35 +159,3 @@ class TestAuthentication(Test):
|
||||
}
|
||||
}
|
||||
|
||||
async def test_setup_new_account(self, client: AsyncClient):
|
||||
# Ensure account is never available. Prevents account already being available.
|
||||
check_if_account_exists: User | None = await User.filter(
|
||||
Q(email="superuser@localhost.com")
|
||||
).get_or_none()
|
||||
if check_if_account_exists:
|
||||
await check_if_account_exists.delete(force=True)
|
||||
|
||||
account = await client.post(
|
||||
"https://localhost/api/v1/auth/register",
|
||||
json={
|
||||
"email": "superuser@localhost.com",
|
||||
"username": "superuser",
|
||||
"name": "awesome",
|
||||
"surname": "superuser",
|
||||
"password": "superuserpassword",
|
||||
"validate_password": "superuserpassword",
|
||||
},
|
||||
)
|
||||
|
||||
assert account.status_code == 201
|
||||
assert account.json() == {
|
||||
"created_at": ANY,
|
||||
"disabled": False,
|
||||
"disabled_at": None,
|
||||
"email": "superuser@localhost.com",
|
||||
"id": ANY,
|
||||
"modified_at": ANY,
|
||||
"name": "awesome",
|
||||
"surname": "superuser",
|
||||
"username": "superuser",
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
from tests.base_test import Test
|
||||
from tortoise.expressions import Q
|
||||
from tests.base_test import Test
|
||||
from httpx import AsyncClient
|
||||
from unittest.mock import ANY
|
||||
from modules.users.models import User
|
||||
|
||||
class TestAccounts(Test):
|
||||
async def test_setup_new_account(self, client: AsyncClient):
|
||||
# Ensure account is never available. Prevents account already being available.
|
||||
check_if_account_exists: User | None = await User.filter(
|
||||
Q(email="superuser@localhost.com")
|
||||
).get_or_none()
|
||||
if check_if_account_exists:
|
||||
await check_if_account_exists.delete(force=True)
|
||||
|
||||
account = await client.post(
|
||||
"https://localhost/api/v1/users/",
|
||||
json={
|
||||
"email": "superuser@localhost.com",
|
||||
"username": "superuser",
|
||||
"name": "awesome",
|
||||
"surname": "superuser",
|
||||
"password": "superuserpassword",
|
||||
"validate_password": "superuserpassword",
|
||||
},
|
||||
)
|
||||
|
||||
assert account.status_code == 201
|
||||
assert account.json() == {
|
||||
"created_at": ANY,
|
||||
"disabled": False,
|
||||
"disabled_at": None,
|
||||
"email": "superuser@localhost.com",
|
||||
"id": ANY,
|
||||
"modified_at": ANY,
|
||||
"name": "awesome",
|
||||
"surname": "superuser",
|
||||
"username": "superuser",
|
||||
}
|
||||
|
||||
async def test_me_route(self, client: AsyncClient, create_user_with_org):
|
||||
# Ensure account is never available. Prevents account already being available.
|
||||
_, _, _, tokens = await create_user_with_org()
|
||||
|
||||
|
||||
account = await client.get(
|
||||
"https://localhost/api/v1/users/me",
|
||||
headers={"Authorization": f"Bearer {tokens.access_token}"},
|
||||
)
|
||||
|
||||
assert account.status_code == 200
|
||||
assert account.json() == {
|
||||
"created_at": ANY,
|
||||
"disabled": False,
|
||||
"disabled_at": None,
|
||||
"email": "user@localhost.com",
|
||||
"id": ANY,
|
||||
"modified_at": ANY,
|
||||
"name": "awesome",
|
||||
"surname": "user",
|
||||
"username": "user",
|
||||
}
|
||||
Reference in New Issue
Block a user