14 Commits

Author SHA1 Message Date
Jeroen Vijgen 86c90122be Merge pull request #12 from BlackChaosNL/fea/users
Add user routes
2025-07-16 21:49:06 +03:00
jjvijgen 96550b92dd Remove dead code 2025-07-16 18:45:05 +00:00
jjvijgen 9f8f7cc112 Refresh some magic numbers with status codes, add delete for account and make people able to update password 2025-07-16 18:43:52 +00:00
jjvijgen c2801858c5 Add updateable route for my user 2025-07-16 18:10:10 +00:00
jjvijgen 8e126ea62b Add user creation route and /me route 2025-07-16 10:57:16 +00:00
Jeroen Vijgen d06a7c0e48 Merge pull request #11 from BlackChaosNL/fea/invitations
Adds invitations to main branch with the following routes:

- Create invitation.
- Remove invitation.
- Accept Invitation.
- Decline Invitation.
2025-07-12 17:24:03 +03:00
jjvijgen a6f9504973 Add two more testcases regarding removing invitations and prevent accepting your own invitations 2025-07-12 14:14:50 +00:00
jjvijgen 8b73307551 Finish api for inviting people to your organization(s) 2025-07-12 13:51:56 +00:00
jjvijgen b65c292d83 Replace Passlib with drop-in replacement, add more invitation routes, start tests 2025-07-09 16:34:42 +00:00
jjvijgen 9a01074ad1 Add start of invitations route 2025-06-25 17:00:01 +00:00
Jeroen Vijgen c4a1a574de Merge pull request #10 from BlackChaosNL/fea/implement-org-routes
Add routes for organization management
2025-06-25 17:44:42 +03:00
jjvijgen ad4507eb93 Prevent update on items that are not posted to Org Update route 2025-06-25 14:42:45 +00:00
jjvijgen 1a9f2a4d57 Add routes for organization management 2025-06-25 14:15:42 +00:00
Jeroen Vijgen 89034557d0 Merge pull request #9 from BlackChaosNL/fea/setup-new-testing-suite
Setup new testing suite
2025-06-25 12:44:22 +03:00
24 changed files with 1593 additions and 182 deletions
+3 -5
View File
@@ -1,8 +1,6 @@
from fastapi.security import OAuth2PasswordBearer
from pydantic_settings import BaseSettings, SettingsConfigDict # type: ignore
from passlib.context import CryptContext # type: ignore
import pytz
from pydantic_settings import BaseSettings, SettingsConfigDict
from pwdlib import PasswordHash
class Settings(BaseSettings):
PROJECT_NAME: str = "StoneEdge Asset Management System"
PROJECT_VERSION: str = "0.0.1"
@@ -19,7 +17,7 @@ class Settings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MIN: int = 10
REFRESH_TOKEN_EXPIRE_MIN: int = 20
BACKEND_CORS_ORIGINS: list = ["*"]
CRYPT: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
CRYPT: PasswordHash = PasswordHash.recommended()
OAUTH2_SCHEME: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")
model_config = SettingsConfigDict(env_file=".env")
+3 -2
View File
@@ -8,6 +8,7 @@ modules: dict[str, Any] = {
"modules.auth.models",
"modules.users.models",
"modules.organizations.models",
"modules.invitations.models",
]
}
@@ -23,7 +24,7 @@ TEST_TORTOISE_ORM = {
},
}
PROD_TORTOISE_ORM = {
TORTOISE_ORM = {
"connections": {
"default": {
"engine": "tortoise.backends.asyncpg",
@@ -45,7 +46,7 @@ PROD_TORTOISE_ORM = {
}
async def migrate_db(tortoise_config=PROD_TORTOISE_ORM):
async def migrate_db(tortoise_config=TORTOISE_ORM):
if settings.IS_TESTING:
tortoise_config=TEST_TORTOISE_ORM
aerich = Command(tortoise_config)
+2
View File
@@ -10,6 +10,7 @@ from modules.assets.router import router as asset_router
from modules.auth.router import router as auth_router
from modules.users.router import router as users_router
from modules.organizations.router import router as organizations_router
from modules.invitations.router import router as invitations_router
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
@@ -53,3 +54,4 @@ app.include_router(auth_router)
app.include_router(users_router)
app.include_router(organizations_router)
app.include_router(asset_router)
app.include_router(invitations_router)
@@ -12,47 +12,65 @@ async def upgrade(db: BaseDBAsyncClient) -> str:
"ADMIN" INT NOT NULL DEFAULT 0
) /* ACL */;
CREATE TABLE IF NOT EXISTS "organization" (
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"id" CHAR(36) NOT NULL PRIMARY KEY,
"name" VARCHAR(128) NOT NULL,
"type" VARCHAR(128) NOT NULL,
"disabled" INT NOT NULL DEFAULT 0
) /* Organization */;
CREATE TABLE IF NOT EXISTS "user" (
"street_name" TEXT,
"zip_code" VARCHAR(128),
"state" VARCHAR(128),
"city" VARCHAR(128),
"country" VARCHAR(128),
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"disabled_at" TIMESTAMP
) /* Organization */;
CREATE TABLE IF NOT EXISTS "user" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"email" VARCHAR(128) NOT NULL,
"username" TEXT NOT NULL,
"name" TEXT NOT NULL,
"surname" TEXT NOT NULL,
"password" VARCHAR(128),
"disabled" INT NOT NULL DEFAULT 0
) /* User */;
CREATE TABLE IF NOT EXISTS "token" (
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"disabled_at" TIMESTAMP
) /* User */;
CREATE TABLE IF NOT EXISTS "token" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"token_type" VARCHAR(128) NOT NULL DEFAULT 'Bearer',
"access_token" TEXT,
"refresh_token" TEXT,
"disabled" INT NOT NULL DEFAULT 0,
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
) /* Token */;
CREATE TABLE IF NOT EXISTS "membership" (
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
) /* Token */;
CREATE TABLE IF NOT EXISTS "membership" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"acl_id" CHAR(36) NOT NULL REFERENCES "acl" ("id") ON DELETE CASCADE,
"organization_id" CHAR(36) NOT NULL REFERENCES "organization" ("id") ON DELETE CASCADE,
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
"organization_id" CHAR(36) REFERENCES "organization" ("id") ON DELETE SET NULL,
"user_id" CHAR(36) REFERENCES "user" ("id") ON DELETE SET NULL
) /* Membership */;
CREATE TABLE IF NOT EXISTS "invite" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"receiver" VARCHAR(128) NOT NULL,
"sender" CHAR(36) NOT NULL,
"org_id" CHAR(36) NOT NULL,
"message" TEXT,
"accepted" INT NOT NULL DEFAULT 0,
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"acl_id" CHAR(36) NOT NULL REFERENCES "acl" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "aerich" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"version" VARCHAR(255) NOT NULL,
-11
View File
@@ -1,11 +0,0 @@
from tortoise import fields
class CMDMixin():
"""
Created, modified and delete mixin, these are required for every class.
"""
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
+4 -5
View File
@@ -4,11 +4,8 @@ from tortoise import fields
import uuid
from datetime import datetime
from mixins.CMDMixin import CMDMixin
from config import settings
class Token(Model, CMDMixin):
class Token(Model):
"""
Token
@@ -21,9 +18,11 @@ class Token(Model, CMDMixin):
access_token: str = fields.TextField(null=True)
refresh_token: str = fields.TextField(null=True)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self) -> None:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
+12 -49
View File
@@ -1,18 +1,15 @@
from datetime import datetime
from typing import Annotated
import uuid
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.routing import APIRouter
import pytz
from modules.users.utils import get_current_active_user
from modules.users.models import User
from modules.auth.utils import create_jwt_tokens, get_tokens_from_logged_in_user
from modules.auth.models import Token
from modules.users.models import User
from fastapi import Depends, HTTPException, status
from tortoise.expressions import Q
from config import settings
from modules.users.schemas import user_model
from modules.auth.schemas import register_model
router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
@@ -31,30 +28,33 @@ async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
Logs the user into our API, creates tokens and passes them back to User.
"""
user: User | None = await User.filter(Q(email=form.username) | Q(username=form.username)).first()
user: User | None = await User.filter(
(Q(email=form.username) | Q(username=form.username)) & Q(disabled=False)
).first()
if user is None:
raise HTTPException(status_code=401, detail=account_error)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
)
if user.check_against_password(form.password) is False:
raise HTTPException(status_code=401, detail=account_error)
if user.disabled is True:
raise HTTPException(status_code=401, detail=account_error)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
)
tokens = await create_jwt_tokens(user)
return {"jwt": tokens}
@router.get("/logout", status_code=204)
@router.get("/logout", status_code=status.HTTP_204_NO_CONTENT)
async def logout(user: Annotated[User, Depends(get_current_active_user)]):
"""
Logout
Logout destroys all tokens for User that are currently active.
"""
get_all_tokens = await Token.filter(Q(user__id=user.id))
get_all_tokens = await Token.filter(Q(user__id=user.id) & Q(disabled=False))
if get_all_tokens is None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT, detail="An error occurred."
@@ -91,12 +91,6 @@ async def refresh_login(
detail=token_error,
)
if refresh_token.disabled is True:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=token_error,
)
get_all_tokens = await Token.filter(Q(user__id=refresh_token.user_id))
for token in get_all_tokens:
@@ -108,34 +102,3 @@ async def refresh_login(
)
return {"jwt": tokens}
@router.post("/register", status_code=201, response_model=user_model)
async def register(user: register_model):
# Prevent existing users from reapplying for our system.
existing_user: User | None = await User.filter(
Q(email=user.email)
& Q(username=user.username)
& Q(name=user.name)
& Q(surname=user.surname)
).get_or_none()
if existing_user is not None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=user_exists,
)
if user.password != user.validate_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=password_failed,
)
return await User.create(
email=user.email,
username=user.username,
name=user.name,
surname=user.surname,
password=crypt.hash(user.password),
)
+1 -10
View File
@@ -1,14 +1,5 @@
from pydantic import BaseModel, EmailStr
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.auth.models import Token
token_model = pydantic_model_creator(Token)
class register_model(BaseModel):
email: EmailStr
username: str
name: str
surname: str
password: str
validate_password: str
token_model = pydantic_model_creator(Token)
+4 -2
View File
@@ -61,7 +61,7 @@ async def create_jwt_tokens(user: User) -> Token:
async def get_tokens_from_logged_in_user(
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)]
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)],
) -> User | None:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -79,4 +79,6 @@ async def get_tokens_from_logged_in_user(
except:
raise credentials_exception
return await Token.filter(Q(refresh_token=token) & Q(user__id=user_id)).first()
return await Token.filter(
Q(refresh_token=token) & Q(user__id=user_id) & Q(disabled=False)
).first()
@@ -0,0 +1,30 @@
from datetime import datetime
import uuid
import pytz
from tortoise import Model, fields
from modules.users.models import ACL
class Invite(Model):
id: uuid.UUID = fields.UUIDField(primary_key=True)
receiver: str = fields.CharField(max_length=128)
sender: uuid.UUID = fields.UUIDField()
org_id: uuid.UUID = fields.UUIDField()
message: str | None = fields.TextField(null=True)
acl: ACL = fields.ForeignKeyField("models.ACL")
accepted: bool = fields.BooleanField(default=False)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self, force: bool = False) -> None:
if force:
await Model.delete(self)
else:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
@@ -0,0 +1,214 @@
from typing import Annotated, List
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from modules.organizations.models import Organization
from modules.invitations.models import Invite
from modules.invitations.schemas import invitation_model, send_invitation_for_org
from modules.users.models import ACL, Membership, User
from modules.users.utils import get_current_active_user
from tortoise.expressions import Q
router = APIRouter(prefix="/api/v1/invitations", tags=["invites"])
@router.get("/", response_model=List[invitation_model])
async def get_all_invitations(
user: Annotated[User, Depends(get_current_active_user)],
) -> List[Invite]:
"""Returns all invitations for user requesting, except disabled invites.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
Returns:
List[Invite]: A list of invitations.
"""
return await Invite.filter(
(Q(sender=user.id) | (Q(receiver=user.username) | Q(receiver=user.email)))
& Q(disabled=False)
)
@router.delete("/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Removes an invitation you have sent
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): UUID for the invitation to be removed
Raises:
HTTPException: When invitation doesn't exist return 403.
Returns:
Invite: The Invitation model.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id) & Q(sender=user.id) & Q(disabled=False)
)
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
await invite.delete()
@router.get("/accept/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def accept_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Accepts the invitation sent by a different organization.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): UUID for the organization that the users wants to add the person to.
Raises:
HTTPException: Raises exception when invite is not available or disabled.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id)
& (Q(receiver=user.username) | Q(receiver=user.email))
& Q(disabled=False)
).prefetch_related("acl")
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
if invite.disabled:
raise HTTPException(
status_code=403,
detail="You have already declined the invitation or the invitation was removed, you can't accept it.",
)
invite.accepted = True
await invite.save()
# Disable invite after accepting, prevent changing it.
await invite.delete()
await Membership.create(
user=user, organization=await Organization.get(id=invite.org_id), acl=invite.acl
)
@router.get("/decline/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def reject_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Declines an invitation to join an organization
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): The UUID of the invitation
Raises:
HTTPException: Checks if the invite exists.
HTTPException: Checks if the invite has already accepted.
Returns:
Invite: The Invitation model.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id)
& (Q(receiver=user.username) | Q(receiver=user.email))
& Q(disabled=False)
).prefetch_related("acl")
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
if invite.accepted:
raise HTTPException(
status_code=403,
detail="The invitation was already accepted, you can't remove it.",
)
await invite.delete()
@router.post("/send", response_model=invitation_model)
async def send_invitation(
user: Annotated[User, Depends(get_current_active_user)],
invite_details: send_invitation_for_org,
) -> Invite:
"""Sends an invitation to e-mail or username.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invite_details (send_invitation_for_org): The details for the invitation.
Raises:
HTTPException: Checks access to the organization posted.
HTTPException: Checks for Manager or Admin permissions and declines if you are not.
Returns:
Invite: The Invitation model.
"""
# Should send an E-Mail as notification.
membership = await Membership.get_or_none(
Q(user=user.id) & Q(organization=invite_details.org_id)
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this organization.",
)
if not membership.acl.MANAGE or not membership.acl.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not allowed to send invitations for this organization.",
)
# Check if user is already part of organization
invited_user: User | None = await User.get_or_none(
Q(username=invite_details.receiver) | Q(email=invite_details.receiver)
)
if invited_user:
user_is_part_of_org = await Membership.get_or_none(
Q(user=invited_user) & Q(organization=invite_details.org_id)
)
if user_is_part_of_org:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The person you've invited is already part of the organization.",
)
acl = None
if invite_details.acl:
acl = await ACL.create(
READ=invite_details.acl.READ,
WRITE=invite_details.acl.WRITE,
REPORT=invite_details.acl.REPORT,
MANAGE=invite_details.acl.MANAGE,
ADMIN=invite_details.acl.ADMIN,
)
else:
acl = await ACL.create(
READ=True,
)
return await Invite.create(
receiver=invite_details.receiver,
sender=user.id,
org_id=invite_details.org_id,
message=invite_details.message,
acl=acl,
)
@@ -0,0 +1,20 @@
import uuid
from pydantic import BaseModel
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.invitations.models import Invite
invitation_model = pydantic_model_creator(Invite)
class acl_model(BaseModel):
READ: bool
WRITE: bool
REPORT: bool
MANAGE: bool
ADMIN: bool
class send_invitation_for_org(BaseModel):
org_id: uuid.UUID
receiver: str
acl: acl_model | None
message: str | None
@@ -7,7 +7,6 @@ from tortoise.exceptions import ConfigurationError
from tortoise.models import Model
from tortoise import fields
from mixins.CMDMixin import CMDMixin
class EnumField(fields.CharField):
"""
@@ -52,8 +51,7 @@ class OrganizationType(Enum):
EXTRA_LARGE_ORGANIZATION: str = "xl_org" # 1000 - 5000+
class Organization(Model, CMDMixin):
class Organization(Model):
"""
Organization
@@ -64,6 +62,11 @@ class Organization(Model, CMDMixin):
id: uuid.UUID = fields.UUIDField(primary_key=True)
name: str = fields.CharField(max_length=128)
type: str = EnumField(OrganizationType)
street_name: str | None = fields.TextField(null=True)
zip_code: str | None = fields.CharField(max_length=128, null=True)
state: str | None = fields.CharField(max_length=128, null=True)
city: str | None = fields.CharField(max_length=128, null=True)
country: str | None = fields.CharField(max_length=128, null=True)
users: uuid.UUID = fields.ManyToManyField(
"models.User",
related_name="members",
@@ -74,6 +77,9 @@ class Organization(Model, CMDMixin):
on_delete=fields.NO_ACTION,
)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
def __str__(self) -> str:
return f"{self.id} - {self.name}"
@@ -85,5 +91,3 @@ class Organization(Model, CMDMixin):
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
@@ -1,17 +1,130 @@
from fastapi import APIRouter
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Annotated, List
from modules.organizations.models import Organization
from modules.organizations.schemas import (
organization_model,
register_organization,
update_org,
)
from modules.users.utils import get_current_active_user
from modules.users.models import ACL, Membership, User
from tortoise.expressions import Q
router = APIRouter(prefix="/api/v1/organizations", tags=["orgs"])
router = APIRouter(prefix="/api/v1/organizations")
@router.get("/", response_model=List[organization_model])
async def all_active_organizations(
user: Annotated[User, Depends(get_current_active_user)],
) -> List[Organization]:
memberships: List[Membership] = await Membership.filter(
Q(user_id=user.id) & Q(disabled=False)
).prefetch_related("organization")
@router.get("/")
def all_organizations():
pass
organizations: List[Organization] = []
@router.delete("/")
def delete_organization():
pass
if len(memberships) < 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No active organizations found!")
@router.post("/create")
def create_organization():
pass
for member in memberships:
organizations.append(member.organization)
return organizations
@router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_organization(
user: Annotated[User, Depends(get_current_active_user)], org_id: uuid.UUID
) -> None:
membership: Membership | None = await Membership.get_or_none(
Q(user=user) & Q(organization_id=org_id)
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=403,
detail="You are not part of the organization you wish to leave or remove.",
)
if membership.acl.ADMIN:
# Prepare to remove ALL members in the organization.
# We've already checked whether user is ADMIN.
all_memberships: List[Membership] = list(
await Membership.filter(Q(organization_id=org_id))
)
for member in all_memberships:
await member.acl.delete()
await member.delete()
await membership.acl.delete()
await membership.delete()
return
@router.post("/", response_model=organization_model)
async def create_organization(
user: Annotated[User, Depends(get_current_active_user)],
register_organization: register_organization,
) -> Organization:
acl: ACL = await ACL.create(
READ=True, WRITE=True, REPORT=True, MANAGE=True, ADMIN=True
)
org: Organization = await Organization.create(
name=register_organization.name,
type=register_organization.type,
street_name=register_organization.street_name,
zip_code=register_organization.zip_code,
state=register_organization.state,
city=register_organization.city,
country=register_organization.country,
)
await Membership.create(organization=org, user=user, acl=acl)
return org
@router.put("/{org_id}", response_model=organization_model)
async def update_organization(
user: Annotated[User, Depends(get_current_active_user)],
org_id: uuid.UUID,
alter_organization: update_org,
) -> Organization:
membership: Membership | None = await Membership.get_or_none(
organization__id=org_id,
user=user,
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=403,
detail="It seems you are not part of the organization or are an admin of the said organization.",
)
if not membership.acl.ADMIN:
raise HTTPException(
status_code=403,
detail="It seems you are not part of the organization or are an admin of the said organization.",
)
org: Organization = await Organization.get(id=org_id)
if alter_organization.name:
org.name = alter_organization.name
if alter_organization.type:
org.type = alter_organization.type
if alter_organization.street_name:
org.street_name = alter_organization.street_name
if alter_organization.zip_code:
org.zip_code = alter_organization.zip_code
if alter_organization.state:
org.state = alter_organization.state
if alter_organization.city:
org.city = alter_organization.city
if alter_organization.country:
org.country = alter_organization.country
await org.save()
return org
@@ -1,6 +1,24 @@
from pydantic import BaseModel
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.organizations.models import Organization
from modules.organizations.models import Organization, OrganizationType
OrganizationModel = pydantic_model_creator(Organization)
organization_model = pydantic_model_creator(Organization)
class register_organization(BaseModel):
name: str
type: OrganizationType
street_name: str | None
zip_code: str | None
state: str | None
city: str | None
country: str | None
class update_org(BaseModel):
name: str | None
type: OrganizationType | None
street_name: str | None
zip_code: str | None
state: str | None
city: str | None
country: str | None
+14 -8
View File
@@ -1,18 +1,18 @@
from datetime import datetime
import uuid
from fastapi import HTTPException, status
from pydantic import EmailStr
import pytz
from tortoise.models import Model
from tortoise import fields
from modules.organizations.models import Organization
from mixins.CMDMixin import CMDMixin
from config import settings
crypt = settings.CRYPT
class User(Model, CMDMixin):
class User(Model):
"""
User
@@ -35,14 +35,17 @@ class User(Model, CMDMixin):
on_delete=fields.NO_ACTION,
)
disabled: bool = fields.BooleanField(default=False)
# tokens = fields.ForeignKeyField("models.Token")
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
def __str__(self) -> str:
return f"{self.id} - {self.name} {self.surname}"
async def set_password(self, password: str) -> None:
async def set_password(self, password: str) -> bool:
self.password = crypt.hash(password)
await self.save() # Make sure to save the model in DB
return True
def check_against_password(self, password: str) -> bool:
return crypt.verify(password, self.password)
@@ -54,7 +57,7 @@ class User(Model, CMDMixin):
return False
if new_password is not verify_new_password:
return False
await self.set_password(new_password)
return await self.set_password(new_password)
async def delete(self, force: bool = False) -> None:
if force:
@@ -90,7 +93,7 @@ class ACL(Model):
"""
class Membership(Model, CMDMixin):
class Membership(Model):
"""
Membership
@@ -98,10 +101,13 @@ class Membership(Model, CMDMixin):
"""
id: uuid.UUID = fields.UUIDField(primary_key=True)
organization: Organization = fields.ForeignKeyField("models.Organization")
user: User = fields.ForeignKeyField("models.User")
organization: Organization = fields.ForeignKeyField("models.Organization", null=True, on_delete=fields.SET_NULL)
user: User = fields.ForeignKeyField("models.User", null=True, on_delete=fields.SET_NULL)
acl: ACL = fields.ForeignKeyField("models.ACL")
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self, force: bool = False) -> None:
if force:
+91 -12
View File
@@ -1,20 +1,99 @@
from fastapi import APIRouter
from typing import Annotated, List
from fastapi import APIRouter, Depends
from modules.users.models import User
from fastapi import HTTPException, status
from tortoise.expressions import Q
from modules.auth.models import Token
from modules.users.utils import get_current_active_user
from modules.users.schemas import register_model, update_user_model
from modules.users.models import Membership, User
from modules.users.schemas import user_model
from config import settings
router = APIRouter(prefix="/api/v1/users", tags=["users"])
crypt = settings.CRYPT
@router.get("/")
def get_all_users():
pass
@router.post("/")
def create_user():
pass
user_exists: str = "Account failed to create, please contact support."
password_failed: str = "Password validation failed, please try again."
@router.get("/me")
def get_user():
pass
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=user_model)
async def create_user(user: register_model):
# Prevent existing users from reapplying for our system.
existing_user: User | None = await User.get_or_none(
Q(email=user.email)
& Q(username=user.username)
& Q(name=user.name)
& Q(surname=user.surname)
& Q(disabled=False)
)
if existing_user is not None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=user_exists,
)
if user.password != user.validate_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=password_failed,
)
return await User.create(
email=user.email,
username=user.username,
name=user.name,
surname=user.surname,
password=crypt.hash(user.password),
)
@router.put("/me", status_code=status.HTTP_204_NO_CONTENT)
async def update_user(
user: Annotated[User, Depends(get_current_active_user)],
updated_user: update_user_model,
):
if updated_user.email:
user.email = updated_user.email
if updated_user.name:
user.name = updated_user.name
if updated_user.surname:
user.surname = updated_user.surname
if (
updated_user.old_password
and updated_user.password
and updated_user.validate_password
):
user.update_password(
updated_user.old_password,
updated_user.password,
updated_user.validate_password,
)
await user.save()
@router.delete("/me", status_code=status.HTTP_204_NO_CONTENT)
async def update_user(
user: Annotated[User, Depends(get_current_active_user)],
):
memberships: List[Membership] = await Membership.filter(Q(user__id=user.id) & Q(disabled=False))
for membership in memberships:
await membership.acl.delete()
await membership.delete()
tokens: List[Token] = await Token.filter(Q(user__id=user.id) & Q(disabled=False))
for token in tokens:
await token.delete()
await user.delete()
@router.get("/me", response_model=user_model)
async def get_user(user: Annotated[User, Depends(get_current_active_user)]):
return user
@@ -1,5 +1,24 @@
from pydantic import BaseModel, EmailStr
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.users.models import User
user_model = pydantic_model_creator(User, exclude=["password"])
class register_model(BaseModel):
email: EmailStr
username: str
name: str
surname: str
password: str
validate_password: str
class update_user_model(BaseModel):
email: EmailStr | None
name: str | None
surname: str | None
old_password: str | None
password: str | None
validate_password: str | None
+3 -8
View File
@@ -10,7 +10,7 @@ from config import settings
async def get_user_from_token(
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)]
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)],
) -> User | None:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -28,7 +28,7 @@ async def get_user_from_token(
except:
raise credentials_exception
return await User.filter(Q(id=user_id)).first()
return await User.filter(Q(id=user_id) & Q(disabled=False)).first()
async def get_current_active_user(
@@ -37,11 +37,6 @@ async def get_current_active_user(
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User is not found or active",
)
if user.disabled:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User is not found or active",
detail="The requested token does not exist or you are not logged in.",
)
return user
@@ -4,7 +4,7 @@ tortoise-orm[asyncpg]>=0.25.1
uvicorn>=0.34.3
black>=25.1.0
joserfc>=1.1.0
passlib>=1.7.4
pwdlib[argon2]>=0.2.1
pytz>=2025.2
ptpython>=3.0.30
msgspec>=0.19.0
@@ -1,10 +1,7 @@
from tests.base_test import Test
from modules.users.models import User
import pytest # type: ignore
from httpx import AsyncClient
from config import settings
from unittest.mock import ANY
from tortoise.expressions import Q
crypt = settings.CRYPT
@@ -162,35 +159,3 @@ class TestAuthentication(Test):
}
}
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(
Q(email="superuser@localhost.com")
).get_or_none()
if check_if_account_exists:
await check_if_account_exists.delete(force=True)
account = await client.post(
"https://localhost/api/v1/auth/register",
json={
"email": "superuser@localhost.com",
"username": "superuser",
"name": "awesome",
"surname": "superuser",
"password": "superuserpassword",
"validate_password": "superuserpassword",
},
)
assert account.status_code == 201
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "superuser@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "superuser",
"username": "superuser",
}
@@ -0,0 +1,547 @@
from unittest.mock import ANY
from httpx import AsyncClient
from modules.users.models import ACL, Membership
from tests.base_test import Test
class TestInvitationalRoutes(Test):
async def test_send_invitations(self, client: AsyncClient, create_user_with_org):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin12@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user1231@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
async def test_cannot_see_others_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin99@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user18@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
async def test_removing_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9999@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user9487@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user9487@localhost.com",
"sender": str(admin.id),
}
invite_id = invite.json()["id"]
removed_invite = await client.delete(
f"https://localhost/api/v1/invitations/{invite_id}",
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert removed_invite.status_code == 204
async def test_cannot_accept_own_invite(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin18569@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "non-existing-user@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "non-existing-user@localhost.com",
"sender": str(admin.id),
}
invite_id = invite.json()["id"]
try_accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert try_accept_invite.status_code == 403
assert try_accept_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_accept_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin191@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user8@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user8@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user8@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user8@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
# When an invite has been accepted, it should be removed.
assert user_invites.json() == []
async def test_decline_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin11@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user98@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user98@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user98@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user98@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/decline/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
async def test_prevent_accepting_when_declined_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9612@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user11918@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user11918@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user11918@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user11918@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/decline/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 403
assert accept_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_prevent_declining_when_accepted_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9712@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user14918@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user14918@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user14918@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user14918@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
decline_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert decline_invite.status_code == 403
assert decline_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_prevent_adding_user_whom_already_belongs_to_your_organization(
self, client: AsyncClient, create_user_with_org
):
_, org, _, admintokens = await create_user_with_org(
email="us3r123@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
user, _, _, _ = await create_user_with_org(email="us3r1234@localhost.com")
await Membership.create(
user=user,
organization=org,
acl=await ACL.create(READ=True)
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "us3r1234@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 403
assert invite.json() == {
"detail": "The person you've invited is already part of the organization."
}
@@ -0,0 +1,284 @@
from httpx import AsyncClient
from modules.users.models import ACL, Membership
from modules.organizations.models import Organization
from unittest.mock import ANY
from tests.base_test import Test
class TestOrganizationRoute(Test):
async def test_get_organizations_from_api(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organizations = await client.get(
"https://localhost/api/v1/organizations/",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == [
{
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"modified_at": ANY,
"name": "simple organization",
"type": "home",
"street_name": None,
"zip_code": None,
"state": None,
"city": None,
"country": None,
},
]
async def test_create_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
async def test_delete_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
org_id = organizations.json()["id"]
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{org_id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 204
async def test_cannot_delete_organization_you_are_not_a_part_of(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{organization.id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 403
async def test_delete_membership_of_organization(
self, client: AsyncClient, create_user_with_org
):
user, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
acl: ACL = await ACL.create(
READ=True, WRITE=True, REPORT=True, MANAGE=False, ADMIN=False
)
await Membership.create(user=user, organization=organization, acl=acl)
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{organization.id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 204
async def test_update_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
org_id = organizations.json()["id"]
update_org = await client.put(
f"https://localhost/api/v1/organizations/{org_id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
async def test_cannot_update_organization_you_are_not_a_part_of(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
update_org = await client.put(
f"https://localhost/api/v1/organizations/{organization.id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.status_code == 403
assert update_org.json() == {
"detail": "It seems you are not part of the organization or are an admin of the said "
"organization.",
}
async def test_cannot_update_organization_you_are_not_an_admin_of(
self, client: AsyncClient, create_user_with_org
):
_, organization, _, tokens = await create_user_with_org()
update_org = await client.put(
f"https://localhost/api/v1/organizations/{organization.id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.status_code == 403
assert update_org.json() == {
"detail": "It seems you are not part of the organization or are an admin of the said "
"organization.",
}
@@ -0,0 +1,154 @@
from tests.base_test import Test
from tortoise.expressions import Q
from tests.base_test import Test
from httpx import AsyncClient
from unittest.mock import ANY
from modules.users.models import User
class TestAccounts(Test):
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(
Q(email="superuser@localhost.com")
).get_or_none()
if check_if_account_exists:
await check_if_account_exists.delete(force=True)
account = await client.post(
"https://localhost/api/v1/users/",
json={
"email": "superuser@localhost.com",
"username": "superuser",
"name": "awesome",
"surname": "superuser",
"password": "superuserpassword",
"validate_password": "superuserpassword",
},
)
assert account.status_code == 201
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "superuser@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "superuser",
"username": "superuser",
}
async def test_me_route(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
async def test_update_me_route(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
account = await client.put(
"https://localhost/api/v1/users/me",
json={
"email": None,
"name": None,
"surname": "bluey",
"old_password": None,
"password": None,
"validate_password": None,
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 204
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "bluey",
"username": "user",
}
async def test_remove_account(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org(email="sup3rus3r@gmail.com")
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "sup3rus3r@gmail.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
delete = await client.delete(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert delete.status_code == 204
old = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert old.status_code == 401
assert old.json() == {
"detail": "The requested token does not exist or you are not logged in.",
}