20 Commits

Author SHA1 Message Date
Jeroen Vijgen 86c90122be Merge pull request #12 from BlackChaosNL/fea/users
Add user routes
2025-07-16 21:49:06 +03:00
jjvijgen 96550b92dd Remove dead code 2025-07-16 18:45:05 +00:00
jjvijgen 9f8f7cc112 Refresh some magic numbers with status codes, add delete for account and make people able to update password 2025-07-16 18:43:52 +00:00
jjvijgen c2801858c5 Add updateable route for my user 2025-07-16 18:10:10 +00:00
jjvijgen 8e126ea62b Add user creation route and /me route 2025-07-16 10:57:16 +00:00
Jeroen Vijgen d06a7c0e48 Merge pull request #11 from BlackChaosNL/fea/invitations
Adds invitations to main branch with the following routes:

- Create invitation.
- Remove invitation.
- Accept Invitation.
- Decline Invitation.
2025-07-12 17:24:03 +03:00
jjvijgen a6f9504973 Add two more testcases regarding removing invitations and prevent accepting your own invitations 2025-07-12 14:14:50 +00:00
jjvijgen 8b73307551 Finish api for inviting people to your organization(s) 2025-07-12 13:51:56 +00:00
jjvijgen b65c292d83 Replace Passlib with drop-in replacement, add more invitation routes, start tests 2025-07-09 16:34:42 +00:00
jjvijgen 9a01074ad1 Add start of invitations route 2025-06-25 17:00:01 +00:00
Jeroen Vijgen c4a1a574de Merge pull request #10 from BlackChaosNL/fea/implement-org-routes
Add routes for organization management
2025-06-25 17:44:42 +03:00
jjvijgen ad4507eb93 Prevent update on items that are not posted to Org Update route 2025-06-25 14:42:45 +00:00
jjvijgen 1a9f2a4d57 Add routes for organization management 2025-06-25 14:15:42 +00:00
Jeroen Vijgen 89034557d0 Merge pull request #9 from BlackChaosNL/fea/setup-new-testing-suite
Setup new testing suite
2025-06-25 12:44:22 +03:00
jjvijgen a7746beac4 Remove debug print 2025-06-25 09:29:24 +00:00
jjvijgen 81bae580f9 Add new testing suite with working fixture 2025-06-25 09:28:09 +00:00
jjvijgen 74a57700c8 Fixing tests and adding manage.py to debug 2025-06-23 22:06:59 +00:00
jjvijgen 390152ac66 Upgrade requirements, add new fixture and setup new testing procedure 2025-06-23 12:37:22 +00:00
Jeroen Vijgen b1013659c4 Merge pull request #8 from BlackChaosNL/fea/prevent-sqlite-from-being-committed
Fix sqlite being committed
2025-06-23 14:47:47 +03:00
jjvijgen 1903a85cd5 Fix sqlite being committed 2025-06-23 11:45:22 +00:00
44 changed files with 1890 additions and 425 deletions
+4
View File
@@ -91,3 +91,7 @@
/web/**/psd
/web/**/thumb
/web/**/sketch
# Prevent some sensitive files from being committed
*.sqlite*
.env
+2 -2
View File
@@ -1,2 +1,2 @@
python 3.13.1
nodejs 23.4.0
python 3.13.5t
nodejs 24.2.0
+5 -6
View File
@@ -1,24 +1,23 @@
from fastapi.security import OAuth2PasswordBearer
from pydantic_settings import BaseSettings, SettingsConfigDict # type: ignore
from passlib.context import CryptContext # type: ignore
import pytz
from pydantic_settings import BaseSettings, SettingsConfigDict
from pwdlib import PasswordHash
class Settings(BaseSettings):
PROJECT_NAME: str = "StoneEdge Asset Management System"
PROJECT_VERSION: str = "0.0.1"
PROJECT_SUMMARY: str = "Product API for StoneEdge."
PROJECT_PUBLIC_URL: str = "localhost"
SECRET_KEY: str | None = None
USE_HTTPS_ONLY: bool = False
IS_TESTING: bool = False
PSQL_USERNAME: str = "user"
PSQL_PASSWORD: str = "password"
PSQL_HOSTNAME: str = "localhost"
PSQL_PORT: int = 5432
PSQL_DB_NAME: str = "stoneedge"
PSQL_TEST_DB_NAME: str = "stoneedge_testing"
ACCESS_TOKEN_EXPIRE_MIN: int = 10
REFRESH_TOKEN_EXPIRE_MIN: int = 20
BACKEND_CORS_ORIGINS: list = ["*"]
CRYPT: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
CRYPT: PasswordHash = PasswordHash.recommended()
OAUTH2_SCHEME: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="token")
model_config = SettingsConfigDict(env_file=".env")
+23 -7
View File
@@ -5,13 +5,25 @@ from aerich import Command
modules: dict[str, Any] = {
"models": [
"modules.assets.models",
"modules.auth.models",
"modules.users.models",
"modules.organizations.models",
"modules.invitations.models",
]
}
TEST_TORTOISE_ORM = {
"connections": {
"default": "sqlite://:memory:"
},
"apps": {
"models": {
"models": modules.get("models", []) + ["aerich.models"],
"default_connection": "default",
},
},
}
TORTOISE_ORM = {
"connections": {
"default": {
@@ -23,7 +35,7 @@ TORTOISE_ORM = {
"password": settings.PSQL_PASSWORD,
"port": settings.PSQL_PORT,
},
}
},
},
"apps": {
"models": {
@@ -34,11 +46,15 @@ TORTOISE_ORM = {
}
async def migrate_db():
aerich = Command(tortoise_config=TORTOISE_ORM)
async def migrate_db(tortoise_config=TORTOISE_ORM):
if settings.IS_TESTING:
tortoise_config=TEST_TORTOISE_ORM
aerich = Command(tortoise_config)
await aerich.init()
await aerich.upgrade(run_in_transaction=True)
await Tortoise.init(config=TORTOISE_ORM)
await aerich.upgrade()
await Tortoise.init(tortoise_config)
await Tortoise.generate_schemas(safe=True)
async def end_connections_to_db():
await Tortoise.close_connections()
await Tortoise.close_connections()
+10 -2
View File
@@ -10,6 +10,7 @@ from modules.assets.router import router as asset_router
from modules.auth.router import router as auth_router
from modules.users.router import router as users_router
from modules.organizations.router import router as organizations_router
from modules.invitations.router import router as invitations_router
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
@@ -29,8 +30,14 @@ app = FastAPI(
default_response_class=msgspec_jsonresponse,
)
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(TrustedHostMiddleware, allowed_hosts=[settings.PROJECT_PUBLIC_URL,])
if settings.USE_HTTPS_ONLY:
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
settings.PROJECT_PUBLIC_URL,
],
)
# Set all CORS enabled origins
if settings.BACKEND_CORS_ORIGINS:
@@ -47,3 +54,4 @@ app.include_router(auth_router)
app.include_router(users_router)
app.include_router(organizations_router)
app.include_router(asset_router)
app.include_router(invitations_router)
+90 -11
View File
@@ -1,24 +1,103 @@
#!/usr/bin/env python3
from ptpython.repl import embed # type: ignore
from ptpython.repl import embed, ReplExit
from database import *
import asyncio, importlib, contextlib, sys, os, tomllib, asyncclick
import asyncio
from database import migrate_db
from pathlib import Path
from asyncclick import BadOptionUsage, ClickException
from collections.abc import AsyncGenerator
from tortoise import Tortoise, connections
#
# Custom implementation of Tortoise CLI
# Original script is located under: https://github.com/tortoise/tortoise-cli
# License: Apache-2.0 as dictated as [here](https://github.com/tortoise/tortoise-cli/blob/main/LICENSE)
#
def tortoise_orm_config(file="pyproject.toml") -> str:
"""
get tortoise orm config from os environment variable or aerich item in pyproject.toml
:param file: toml file that aerich item loads from it
:return: module path and var name that store the tortoise config, e.g.: 'settings.TORTOISE_ORM'
"""
if not (config := os.getenv("TORTOISE_ORM", "")) and (p := Path(file)).exists():
doc = tomllib.loads(p.read_text("utf-8"))
config = doc.get("tool", {}).get("aerich", {}).get("tortoise_orm", "")
return config
def get_tortoise_config(config: str) -> dict:
"""
get tortoise config from module
:param ctx:
:param config:
:return:
"""
splits = config.split(".")
config_path = ".".join(splits[:-1])
tortoise_config = splits[-1]
try:
config_module = importlib.import_module(config_path)
except ModuleNotFoundError as e:
raise ClickException(
f"Error while importing configuration module: {e}"
) from None
c = getattr(config_module, tortoise_config, None)
if not c:
raise BadOptionUsage(
option_name="--config",
message=f'Can\'t get "{tortoise_config}" from module "{config_module}"',
ctx=None,
)
return c
@contextlib.asynccontextmanager
async def aclose_tortoise() -> AsyncGenerator[None]:
try:
yield
finally:
if Tortoise._inited:
await connections.close_all()
def history():
import readline
for i in range(1, readline.get_current_history_length()+1):
print("%3d %s" % (i, readline.get_history_item(i)))
async def setup():
try:
await embed(globals=globals(), return_asyncio_coroutine=True, patch_stdout=True)
except EOFError:
loop.stop()
if not (config := tortoise_orm_config()):
raise asyncclick.UsageError(
"You must specify TORTOISE_ORM in option or env, or config file pyproject.toml from config of aerich",
ctx=None,
)
await migrate_db(get_tortoise_config(config))
async with aclose_tortoise():
await embed(
globals=globals(),
title="shell",
vi_mode=True,
return_asyncio_coroutine=True,
patch_stdout=True,
)
if __name__ == "__main__":
if sys.path[0] != ".":
sys.path.insert(0, ".")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
asyncio.ensure_future(setup())
loop.run_forever()
except KeyboardInterrupt:
pass
history()
loop.run_until_complete(asyncio.ensure_future(setup()))
except (KeyboardInterrupt, ReplExit) as e:
print(e)
loop.stop()
@@ -1,75 +0,0 @@
from tortoise import BaseDBAsyncClient
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
CREATE TABLE IF NOT EXISTS "asset" (
"id" UUID NOT NULL PRIMARY KEY,
"name" VARCHAR(128) NOT NULL
);
CREATE TABLE IF NOT EXISTS "acl" (
"id" UUID NOT NULL PRIMARY KEY,
"READ" BOOL NOT NULL DEFAULT False,
"WRITE" BOOL NOT NULL DEFAULT False,
"REPORT" BOOL NOT NULL DEFAULT False,
"MANAGE" BOOL NOT NULL DEFAULT False,
"ADMIN" BOOL NOT NULL DEFAULT False
);
COMMENT ON TABLE "acl" IS 'ACL';
CREATE TABLE IF NOT EXISTS "organization" (
"created_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMPTZ,
"id" UUID NOT NULL PRIMARY KEY,
"name" VARCHAR(128) NOT NULL,
"type" VARCHAR(128) NOT NULL,
"disabled" BOOL NOT NULL DEFAULT False
);
COMMENT ON TABLE "organization" IS 'Organization';
CREATE TABLE IF NOT EXISTS "user" (
"created_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMPTZ,
"id" UUID NOT NULL PRIMARY KEY,
"email" VARCHAR(128) NOT NULL,
"username" TEXT NOT NULL,
"name" TEXT NOT NULL,
"surname" TEXT NOT NULL,
"password" VARCHAR(128),
"disabled" BOOL NOT NULL DEFAULT False
);
COMMENT ON TABLE "user" IS 'User';
CREATE TABLE IF NOT EXISTS "token" (
"created_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMPTZ,
"id" UUID NOT NULL PRIMARY KEY,
"token_type" VARCHAR(128) NOT NULL DEFAULT 'Bearer',
"access_token" VARCHAR(128),
"refresh_token" VARCHAR(128),
"disabled" BOOL NOT NULL DEFAULT False,
"user_id" UUID NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
);
COMMENT ON TABLE "token" IS 'Token';
CREATE TABLE IF NOT EXISTS "membership" (
"created_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMPTZ,
"id" UUID NOT NULL PRIMARY KEY,
"disabled" BOOL NOT NULL DEFAULT False,
"acl_id" UUID NOT NULL REFERENCES "acl" ("id") ON DELETE CASCADE,
"organization_id" UUID NOT NULL REFERENCES "organization" ("id") ON DELETE CASCADE,
"user_id" UUID NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
);
COMMENT ON TABLE "membership" IS 'Membership';
CREATE TABLE IF NOT EXISTS "aerich" (
"id" SERIAL NOT NULL PRIMARY KEY,
"version" VARCHAR(255) NOT NULL,
"app" VARCHAR(100) NOT NULL,
"content" JSONB NOT NULL
);"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
"""
@@ -1,15 +0,0 @@
from tortoise import BaseDBAsyncClient
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "asset" ADD "disabled_at" TIMESTAMPTZ;
ALTER TABLE "asset" ADD "modified_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP;
ALTER TABLE "asset" ADD "created_at" TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP;"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "asset" DROP COLUMN "disabled_at";
ALTER TABLE "asset" DROP COLUMN "modified_at";
ALTER TABLE "asset" DROP COLUMN "created_at";"""
@@ -1,13 +0,0 @@
from tortoise import BaseDBAsyncClient
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "token" ALTER COLUMN "refresh_token" TYPE TEXT USING "refresh_token"::TEXT;
ALTER TABLE "token" ALTER COLUMN "access_token" TYPE TEXT USING "access_token"::TEXT;"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "token" ALTER COLUMN "refresh_token" TYPE VARCHAR(128) USING "refresh_token"::VARCHAR(128);
ALTER TABLE "token" ALTER COLUMN "access_token" TYPE VARCHAR(128) USING "access_token"::VARCHAR(128);"""
@@ -0,0 +1,94 @@
from tortoise import BaseDBAsyncClient
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
CREATE TABLE IF NOT EXISTS "acl" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"READ" INT NOT NULL DEFAULT 0,
"WRITE" INT NOT NULL DEFAULT 0,
"REPORT" INT NOT NULL DEFAULT 0,
"MANAGE" INT NOT NULL DEFAULT 0,
"ADMIN" INT NOT NULL DEFAULT 0
) /* ACL */;
CREATE TABLE IF NOT EXISTS "organization" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"name" VARCHAR(128) NOT NULL,
"type" VARCHAR(128) NOT NULL,
"street_name" TEXT,
"zip_code" VARCHAR(128),
"state" VARCHAR(128),
"city" VARCHAR(128),
"country" VARCHAR(128),
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP
) /* Organization */;
CREATE TABLE IF NOT EXISTS "user" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"email" VARCHAR(128) NOT NULL,
"username" TEXT NOT NULL,
"name" TEXT NOT NULL,
"surname" TEXT NOT NULL,
"password" VARCHAR(128),
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP
) /* User */;
CREATE TABLE IF NOT EXISTS "token" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"token_type" VARCHAR(128) NOT NULL DEFAULT 'Bearer',
"access_token" TEXT,
"refresh_token" TEXT,
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
) /* Token */;
CREATE TABLE IF NOT EXISTS "membership" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"acl_id" CHAR(36) NOT NULL REFERENCES "acl" ("id") ON DELETE CASCADE,
"organization_id" CHAR(36) REFERENCES "organization" ("id") ON DELETE SET NULL,
"user_id" CHAR(36) REFERENCES "user" ("id") ON DELETE SET NULL
) /* Membership */;
CREATE TABLE IF NOT EXISTS "invite" (
"id" CHAR(36) NOT NULL PRIMARY KEY,
"receiver" VARCHAR(128) NOT NULL,
"sender" CHAR(36) NOT NULL,
"org_id" CHAR(36) NOT NULL,
"message" TEXT,
"accepted" INT NOT NULL DEFAULT 0,
"disabled" INT NOT NULL DEFAULT 0,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"modified_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"acl_id" CHAR(36) NOT NULL REFERENCES "acl" ("id") ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS "aerich" (
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
"version" VARCHAR(255) NOT NULL,
"app" VARCHAR(100) NOT NULL,
"content" JSON NOT NULL
);
CREATE TABLE IF NOT EXISTS "Membership" (
"organization_id" CHAR(36) NOT NULL REFERENCES "organization" ("id") ON DELETE NO ACTION,
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE NO ACTION
);
CREATE UNIQUE INDEX IF NOT EXISTS "uidx_Membership_organiz_b0a446" ON "Membership" ("organization_id", "user_id");
CREATE TABLE IF NOT EXISTS "Membership" (
"user_id" CHAR(36) NOT NULL REFERENCES "user" ("id") ON DELETE NO ACTION,
"organization_id" CHAR(36) NOT NULL REFERENCES "organization" ("id") ON DELETE NO ACTION
);
CREATE UNIQUE INDEX IF NOT EXISTS "uidx_Membership_user_id_cc48d3" ON "Membership" ("user_id", "organization_id");"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
"""
-11
View File
@@ -1,11 +0,0 @@
from tortoise import fields
class CMDMixin():
"""
Created, modified and delete mixin, these are required for every class.
"""
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
+4 -5
View File
@@ -4,11 +4,8 @@ from tortoise import fields
import uuid
from datetime import datetime
from mixins.CMDMixin import CMDMixin
from config import settings
class Token(Model, CMDMixin):
class Token(Model):
"""
Token
@@ -21,9 +18,11 @@ class Token(Model, CMDMixin):
access_token: str = fields.TextField(null=True)
refresh_token: str = fields.TextField(null=True)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self) -> None:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
+12 -49
View File
@@ -1,18 +1,15 @@
from datetime import datetime
from typing import Annotated
import uuid
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.routing import APIRouter
import pytz
from modules.users.utils import get_current_active_user
from modules.users.models import User
from modules.auth.utils import create_jwt_tokens, get_tokens_from_logged_in_user
from modules.auth.models import Token
from modules.users.models import User
from fastapi import Depends, HTTPException, status
from tortoise.expressions import Q
from config import settings
from modules.users.schemas import user_model
from modules.auth.schemas import register_model
router = APIRouter(prefix="/api/v1/auth", tags=["auth"])
@@ -31,30 +28,33 @@ async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
Logs the user into our API, creates tokens and passes them back to User.
"""
user: User | None = await User.filter(email=form.username).first()
user: User | None = await User.filter(
(Q(email=form.username) | Q(username=form.username)) & Q(disabled=False)
).first()
if user is None:
raise HTTPException(status_code=401, detail=account_error)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
)
if user.check_against_password(form.password) is False:
raise HTTPException(status_code=401, detail=account_error)
if user.disabled is True:
raise HTTPException(status_code=401, detail=account_error)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=account_error
)
tokens = await create_jwt_tokens(user)
return {"jwt": tokens}
@router.get("/logout", status_code=204)
@router.get("/logout", status_code=status.HTTP_204_NO_CONTENT)
async def logout(user: Annotated[User, Depends(get_current_active_user)]):
"""
Logout
Logout destroys all tokens for User that are currently active.
"""
get_all_tokens = await Token.filter(Q(user__id=user.id))
get_all_tokens = await Token.filter(Q(user__id=user.id) & Q(disabled=False))
if get_all_tokens is None:
raise HTTPException(
status_code=status.HTTP_204_NO_CONTENT, detail="An error occurred."
@@ -91,12 +91,6 @@ async def refresh_login(
detail=token_error,
)
if refresh_token.disabled is True:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=token_error,
)
get_all_tokens = await Token.filter(Q(user__id=refresh_token.user_id))
for token in get_all_tokens:
@@ -108,34 +102,3 @@ async def refresh_login(
)
return {"jwt": tokens}
@router.post("/register", status_code=201, response_model=user_model)
async def register(user: register_model):
# Prevent existing users from reapplying for our system.
existing_user: User | None = await User.filter(
Q(email=user.email)
& Q(username=user.username)
& Q(name=user.name)
& Q(surname=user.surname)
).get_or_none()
if existing_user is not None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=user_exists,
)
if user.password != user.validate_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=password_failed,
)
return await User.create(
email=user.email,
username=user.username,
name=user.name,
surname=user.surname,
password=crypt.hash(user.password),
)
+1 -10
View File
@@ -1,14 +1,5 @@
from pydantic import BaseModel, EmailStr
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.auth.models import Token
token_model = pydantic_model_creator(Token)
class register_model(BaseModel):
email: EmailStr
username: str
name: str
surname: str
password: str
validate_password: str
token_model = pydantic_model_creator(Token)
+4 -2
View File
@@ -61,7 +61,7 @@ async def create_jwt_tokens(user: User) -> Token:
async def get_tokens_from_logged_in_user(
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)]
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)],
) -> User | None:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -79,4 +79,6 @@ async def get_tokens_from_logged_in_user(
except:
raise credentials_exception
return await Token.filter(Q(refresh_token=token) & Q(user__id=user_id)).first()
return await Token.filter(
Q(refresh_token=token) & Q(user__id=user_id) & Q(disabled=False)
).first()
@@ -0,0 +1,30 @@
from datetime import datetime
import uuid
import pytz
from tortoise import Model, fields
from modules.users.models import ACL
class Invite(Model):
id: uuid.UUID = fields.UUIDField(primary_key=True)
receiver: str = fields.CharField(max_length=128)
sender: uuid.UUID = fields.UUIDField()
org_id: uuid.UUID = fields.UUIDField()
message: str | None = fields.TextField(null=True)
acl: ACL = fields.ForeignKeyField("models.ACL")
accepted: bool = fields.BooleanField(default=False)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self, force: bool = False) -> None:
if force:
await Model.delete(self)
else:
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
@@ -0,0 +1,214 @@
from typing import Annotated, List
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from modules.organizations.models import Organization
from modules.invitations.models import Invite
from modules.invitations.schemas import invitation_model, send_invitation_for_org
from modules.users.models import ACL, Membership, User
from modules.users.utils import get_current_active_user
from tortoise.expressions import Q
router = APIRouter(prefix="/api/v1/invitations", tags=["invites"])
@router.get("/", response_model=List[invitation_model])
async def get_all_invitations(
user: Annotated[User, Depends(get_current_active_user)],
) -> List[Invite]:
"""Returns all invitations for user requesting, except disabled invites.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
Returns:
List[Invite]: A list of invitations.
"""
return await Invite.filter(
(Q(sender=user.id) | (Q(receiver=user.username) | Q(receiver=user.email)))
& Q(disabled=False)
)
@router.delete("/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Removes an invitation you have sent
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): UUID for the invitation to be removed
Raises:
HTTPException: When invitation doesn't exist return 403.
Returns:
Invite: The Invitation model.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id) & Q(sender=user.id) & Q(disabled=False)
)
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
await invite.delete()
@router.get("/accept/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def accept_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Accepts the invitation sent by a different organization.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): UUID for the organization that the users wants to add the person to.
Raises:
HTTPException: Raises exception when invite is not available or disabled.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id)
& (Q(receiver=user.username) | Q(receiver=user.email))
& Q(disabled=False)
).prefetch_related("acl")
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
if invite.disabled:
raise HTTPException(
status_code=403,
detail="You have already declined the invitation or the invitation was removed, you can't accept it.",
)
invite.accepted = True
await invite.save()
# Disable invite after accepting, prevent changing it.
await invite.delete()
await Membership.create(
user=user, organization=await Organization.get(id=invite.org_id), acl=invite.acl
)
@router.get("/decline/{invitation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def reject_invitation(
user: Annotated[User, Depends(get_current_active_user)], invitation_id: uuid.UUID
) -> None:
"""Declines an invitation to join an organization
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invitation_id (uuid.UUID): The UUID of the invitation
Raises:
HTTPException: Checks if the invite exists.
HTTPException: Checks if the invite has already accepted.
Returns:
Invite: The Invitation model.
"""
invite: Invite | None = await Invite.get_or_none(
Q(id=invitation_id)
& (Q(receiver=user.username) | Q(receiver=user.email))
& Q(disabled=False)
).prefetch_related("acl")
if not invite:
raise HTTPException(
status_code=403,
detail="The invitation doesn't exist or you don't have access to it.",
)
if invite.accepted:
raise HTTPException(
status_code=403,
detail="The invitation was already accepted, you can't remove it.",
)
await invite.delete()
@router.post("/send", response_model=invitation_model)
async def send_invitation(
user: Annotated[User, Depends(get_current_active_user)],
invite_details: send_invitation_for_org,
) -> Invite:
"""Sends an invitation to e-mail or username.
Args:
user (Annotated[User, Depends(get_current_active_user)]): Returns user token.
invite_details (send_invitation_for_org): The details for the invitation.
Raises:
HTTPException: Checks access to the organization posted.
HTTPException: Checks for Manager or Admin permissions and declines if you are not.
Returns:
Invite: The Invitation model.
"""
# Should send an E-Mail as notification.
membership = await Membership.get_or_none(
Q(user=user.id) & Q(organization=invite_details.org_id)
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have access to this organization.",
)
if not membership.acl.MANAGE or not membership.acl.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not allowed to send invitations for this organization.",
)
# Check if user is already part of organization
invited_user: User | None = await User.get_or_none(
Q(username=invite_details.receiver) | Q(email=invite_details.receiver)
)
if invited_user:
user_is_part_of_org = await Membership.get_or_none(
Q(user=invited_user) & Q(organization=invite_details.org_id)
)
if user_is_part_of_org:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The person you've invited is already part of the organization.",
)
acl = None
if invite_details.acl:
acl = await ACL.create(
READ=invite_details.acl.READ,
WRITE=invite_details.acl.WRITE,
REPORT=invite_details.acl.REPORT,
MANAGE=invite_details.acl.MANAGE,
ADMIN=invite_details.acl.ADMIN,
)
else:
acl = await ACL.create(
READ=True,
)
return await Invite.create(
receiver=invite_details.receiver,
sender=user.id,
org_id=invite_details.org_id,
message=invite_details.message,
acl=acl,
)
@@ -0,0 +1,20 @@
import uuid
from pydantic import BaseModel
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.invitations.models import Invite
invitation_model = pydantic_model_creator(Invite)
class acl_model(BaseModel):
READ: bool
WRITE: bool
REPORT: bool
MANAGE: bool
ADMIN: bool
class send_invitation_for_org(BaseModel):
org_id: uuid.UUID
receiver: str
acl: acl_model | None
message: str | None
@@ -7,7 +7,6 @@ from tortoise.exceptions import ConfigurationError
from tortoise.models import Model
from tortoise import fields
from mixins.CMDMixin import CMDMixin
class EnumField(fields.CharField):
"""
@@ -52,8 +51,7 @@ class OrganizationType(Enum):
EXTRA_LARGE_ORGANIZATION: str = "xl_org" # 1000 - 5000+
class Organization(Model, CMDMixin):
class Organization(Model):
"""
Organization
@@ -64,6 +62,11 @@ class Organization(Model, CMDMixin):
id: uuid.UUID = fields.UUIDField(primary_key=True)
name: str = fields.CharField(max_length=128)
type: str = EnumField(OrganizationType)
street_name: str | None = fields.TextField(null=True)
zip_code: str | None = fields.CharField(max_length=128, null=True)
state: str | None = fields.CharField(max_length=128, null=True)
city: str | None = fields.CharField(max_length=128, null=True)
country: str | None = fields.CharField(max_length=128, null=True)
users: uuid.UUID = fields.ManyToManyField(
"models.User",
related_name="members",
@@ -74,6 +77,9 @@ class Organization(Model, CMDMixin):
on_delete=fields.NO_ACTION,
)
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
def __str__(self) -> str:
return f"{self.id} - {self.name}"
@@ -85,5 +91,3 @@ class Organization(Model, CMDMixin):
self.disabled = True
self.disabled_at = datetime.now(tz=pytz.UTC)
await self.save()
@@ -1,17 +1,130 @@
from fastapi import APIRouter
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Annotated, List
from modules.organizations.models import Organization
from modules.organizations.schemas import (
organization_model,
register_organization,
update_org,
)
from modules.users.utils import get_current_active_user
from modules.users.models import ACL, Membership, User
from tortoise.expressions import Q
router = APIRouter(prefix="/api/v1/organizations", tags=["orgs"])
router = APIRouter(prefix="/api/v1/organizations")
@router.get("/", response_model=List[organization_model])
async def all_active_organizations(
user: Annotated[User, Depends(get_current_active_user)],
) -> List[Organization]:
memberships: List[Membership] = await Membership.filter(
Q(user_id=user.id) & Q(disabled=False)
).prefetch_related("organization")
@router.get("/")
def all_organizations():
pass
organizations: List[Organization] = []
@router.delete("/")
def delete_organization():
pass
if len(memberships) < 1:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No active organizations found!")
@router.post("/create")
def create_organization():
pass
for member in memberships:
organizations.append(member.organization)
return organizations
@router.delete("/{org_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_organization(
user: Annotated[User, Depends(get_current_active_user)], org_id: uuid.UUID
) -> None:
membership: Membership | None = await Membership.get_or_none(
Q(user=user) & Q(organization_id=org_id)
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=403,
detail="You are not part of the organization you wish to leave or remove.",
)
if membership.acl.ADMIN:
# Prepare to remove ALL members in the organization.
# We've already checked whether user is ADMIN.
all_memberships: List[Membership] = list(
await Membership.filter(Q(organization_id=org_id))
)
for member in all_memberships:
await member.acl.delete()
await member.delete()
await membership.acl.delete()
await membership.delete()
return
@router.post("/", response_model=organization_model)
async def create_organization(
user: Annotated[User, Depends(get_current_active_user)],
register_organization: register_organization,
) -> Organization:
acl: ACL = await ACL.create(
READ=True, WRITE=True, REPORT=True, MANAGE=True, ADMIN=True
)
org: Organization = await Organization.create(
name=register_organization.name,
type=register_organization.type,
street_name=register_organization.street_name,
zip_code=register_organization.zip_code,
state=register_organization.state,
city=register_organization.city,
country=register_organization.country,
)
await Membership.create(organization=org, user=user, acl=acl)
return org
@router.put("/{org_id}", response_model=organization_model)
async def update_organization(
user: Annotated[User, Depends(get_current_active_user)],
org_id: uuid.UUID,
alter_organization: update_org,
) -> Organization:
membership: Membership | None = await Membership.get_or_none(
organization__id=org_id,
user=user,
).prefetch_related("acl")
if not membership:
raise HTTPException(
status_code=403,
detail="It seems you are not part of the organization or are an admin of the said organization.",
)
if not membership.acl.ADMIN:
raise HTTPException(
status_code=403,
detail="It seems you are not part of the organization or are an admin of the said organization.",
)
org: Organization = await Organization.get(id=org_id)
if alter_organization.name:
org.name = alter_organization.name
if alter_organization.type:
org.type = alter_organization.type
if alter_organization.street_name:
org.street_name = alter_organization.street_name
if alter_organization.zip_code:
org.zip_code = alter_organization.zip_code
if alter_organization.state:
org.state = alter_organization.state
if alter_organization.city:
org.city = alter_organization.city
if alter_organization.country:
org.country = alter_organization.country
await org.save()
return org
@@ -1,6 +1,24 @@
from pydantic import BaseModel
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.organizations.models import Organization
from modules.organizations.models import Organization, OrganizationType
OrganizationModel = pydantic_model_creator(Organization)
organization_model = pydantic_model_creator(Organization)
class register_organization(BaseModel):
name: str
type: OrganizationType
street_name: str | None
zip_code: str | None
state: str | None
city: str | None
country: str | None
class update_org(BaseModel):
name: str | None
type: OrganizationType | None
street_name: str | None
zip_code: str | None
state: str | None
city: str | None
country: str | None
+14 -8
View File
@@ -1,18 +1,18 @@
from datetime import datetime
import uuid
from fastapi import HTTPException, status
from pydantic import EmailStr
import pytz
from tortoise.models import Model
from tortoise import fields
from modules.organizations.models import Organization
from mixins.CMDMixin import CMDMixin
from config import settings
crypt = settings.CRYPT
class User(Model, CMDMixin):
class User(Model):
"""
User
@@ -35,14 +35,17 @@ class User(Model, CMDMixin):
on_delete=fields.NO_ACTION,
)
disabled: bool = fields.BooleanField(default=False)
# tokens = fields.ForeignKeyField("models.Token")
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
def __str__(self) -> str:
return f"{self.id} - {self.name} {self.surname}"
async def set_password(self, password: str) -> None:
async def set_password(self, password: str) -> bool:
self.password = crypt.hash(password)
await self.save() # Make sure to save the model in DB
return True
def check_against_password(self, password: str) -> bool:
return crypt.verify(password, self.password)
@@ -54,7 +57,7 @@ class User(Model, CMDMixin):
return False
if new_password is not verify_new_password:
return False
await self.set_password(new_password)
return await self.set_password(new_password)
async def delete(self, force: bool = False) -> None:
if force:
@@ -90,7 +93,7 @@ class ACL(Model):
"""
class Membership(Model, CMDMixin):
class Membership(Model):
"""
Membership
@@ -98,10 +101,13 @@ class Membership(Model, CMDMixin):
"""
id: uuid.UUID = fields.UUIDField(primary_key=True)
organization: Organization = fields.ForeignKeyField("models.Organization")
user: User = fields.ForeignKeyField("models.User")
organization: Organization = fields.ForeignKeyField("models.Organization", null=True, on_delete=fields.SET_NULL)
user: User = fields.ForeignKeyField("models.User", null=True, on_delete=fields.SET_NULL)
acl: ACL = fields.ForeignKeyField("models.ACL")
disabled: bool = fields.BooleanField(default=False)
created_at = fields.DatetimeField(null=True, auto_now_add=True)
modified_at = fields.DatetimeField(null=True, auto_now=True)
disabled_at = fields.DatetimeField(null=True)
async def delete(self, force: bool = False) -> None:
if force:
+91 -12
View File
@@ -1,20 +1,99 @@
from fastapi import APIRouter
from typing import Annotated, List
from fastapi import APIRouter, Depends
from modules.users.models import User
from fastapi import HTTPException, status
from tortoise.expressions import Q
from modules.auth.models import Token
from modules.users.utils import get_current_active_user
from modules.users.schemas import register_model, update_user_model
from modules.users.models import Membership, User
from modules.users.schemas import user_model
from config import settings
router = APIRouter(prefix="/api/v1/users", tags=["users"])
crypt = settings.CRYPT
@router.get("/")
def get_all_users():
pass
@router.post("/")
def create_user():
pass
user_exists: str = "Account failed to create, please contact support."
password_failed: str = "Password validation failed, please try again."
@router.get("/me")
def get_user():
pass
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=user_model)
async def create_user(user: register_model):
# Prevent existing users from reapplying for our system.
existing_user: User | None = await User.get_or_none(
Q(email=user.email)
& Q(username=user.username)
& Q(name=user.name)
& Q(surname=user.surname)
& Q(disabled=False)
)
if existing_user is not None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=user_exists,
)
if user.password != user.validate_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=password_failed,
)
return await User.create(
email=user.email,
username=user.username,
name=user.name,
surname=user.surname,
password=crypt.hash(user.password),
)
@router.put("/me", status_code=status.HTTP_204_NO_CONTENT)
async def update_user(
user: Annotated[User, Depends(get_current_active_user)],
updated_user: update_user_model,
):
if updated_user.email:
user.email = updated_user.email
if updated_user.name:
user.name = updated_user.name
if updated_user.surname:
user.surname = updated_user.surname
if (
updated_user.old_password
and updated_user.password
and updated_user.validate_password
):
user.update_password(
updated_user.old_password,
updated_user.password,
updated_user.validate_password,
)
await user.save()
@router.delete("/me", status_code=status.HTTP_204_NO_CONTENT)
async def update_user(
user: Annotated[User, Depends(get_current_active_user)],
):
memberships: List[Membership] = await Membership.filter(Q(user__id=user.id) & Q(disabled=False))
for membership in memberships:
await membership.acl.delete()
await membership.delete()
tokens: List[Token] = await Token.filter(Q(user__id=user.id) & Q(disabled=False))
for token in tokens:
await token.delete()
await user.delete()
@router.get("/me", response_model=user_model)
async def get_user(user: Annotated[User, Depends(get_current_active_user)]):
return user
@@ -1,5 +1,24 @@
from pydantic import BaseModel, EmailStr
from tortoise.contrib.pydantic import pydantic_model_creator
from modules.users.models import User
user_model = pydantic_model_creator(User, exclude=["password"])
class register_model(BaseModel):
email: EmailStr
username: str
name: str
surname: str
password: str
validate_password: str
class update_user_model(BaseModel):
email: EmailStr | None
name: str | None
surname: str | None
old_password: str | None
password: str | None
validate_password: str | None
+3 -8
View File
@@ -10,7 +10,7 @@ from config import settings
async def get_user_from_token(
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)]
token: Annotated[str, Depends(settings.OAUTH2_SCHEME)],
) -> User | None:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -28,7 +28,7 @@ async def get_user_from_token(
except:
raise credentials_exception
return await User.filter(Q(id=user_id)).first()
return await User.filter(Q(id=user_id) & Q(disabled=False)).first()
async def get_current_active_user(
@@ -37,11 +37,6 @@ async def get_current_active_user(
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User is not found or active",
)
if user.disabled:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User is not found or active",
detail="The requested token does not exist or you are not logged in.",
)
return user
@@ -1,21 +1,22 @@
aerich>=0.8.0
fastapi[all]>=0.115.5
python-dotenv>=0.21.0
tortoise-orm[asyncpg]>=0.22.1
uvicorn>=0.31.1
black>=24.10.0
joserfc>=1.0.1
passlib>=1.7.4
pytz>=2024.2
ptpython>=0.25
aerich>=0.9.0
fastapi[all]>=0.115.12
tortoise-orm[asyncpg]>=0.25.1
uvicorn>=0.34.3
black>=25.1.0
joserfc>=1.1.0
pwdlib[argon2]>=0.2.1
pytz>=2025.2
ptpython>=3.0.30
msgspec>=0.19.0
bcrypt>=4.2.1
bcrypt>=4.3.0
tomlkit>=0.13.3
# Test Suite
httpx>=0.28.1
pytest>=8.3.4
mock>=5.1.0
mock>=5.2.0
pytest>=8.4.0
asyncio>=3.4.3
pytest-mock>=3.14.0
pytest-asyncio>=0.25.3
asgi-lifespan>=2.1.0
pytest-mock>=3.14.1
pytest-asyncio>=1.0.0
asgi-lifespan>=2.1.0
Faker>=37.4.0
+4
View File
@@ -0,0 +1,4 @@
class Test():
pass
+8 -17
View File
@@ -1,19 +1,9 @@
import asyncio
import asyncio, httpx, pytest
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import httpx, pytest
from config import settings
from glob import glob
from asgi_lifespan import LifespanManager
from asgi_lifespan import LifespanManager # type: ignore
settings.PSQL_DB_NAME = settings.PSQL_TEST_DB_NAME
pytest_plugins = [
fixture.replace("/", ".").replace("\\", ".").replace(".py", "")
for fixture in glob("tests/fixtures/*.py")
if "__" not in fixture
]
from tests.fixtures.account import *
try:
from main import app
@@ -27,12 +17,12 @@ except ImportError:
ClientManagerType = AsyncGenerator[httpx.AsyncClient, None]
@pytest.fixture(scope="session")
@pytest.fixture
def anyio_backend():
return "asyncio"
@pytest.fixture(scope="session")
@pytest.fixture
def event_loop():
loop = asyncio.get_event_loop()
yield loop
@@ -43,8 +33,9 @@ def event_loop():
async def client_manager(app, base_url="https://localhost", **kw) -> ClientManagerType:
app.state.testing = True
async with LifespanManager(app):
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url=base_url, **kw) as c:
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app), base_url=base_url, **kw
) as c:
yield c
View File
+70
View File
@@ -0,0 +1,70 @@
import pytest
from dataclasses import dataclass
from modules.auth.utils import create_jwt_tokens
from modules.organizations.models import Organization, OrganizationType
from modules.users.models import ACL, Membership, User
from modules.auth.models import Token
from tortoise.expressions import Q
from config import settings
crypt = settings.CRYPT
@dataclass
class user_creation_return_type:
user: User
organization: Organization
acl: ACL
tokens: Token
@pytest.fixture()
async def create_user_with_org():
async def inner_function(email="user@localhost.com",
username="user",
name="awesome",
surname="user",
password="password-dont-use",
organization_name="simple organization",
organization_type=OrganizationType.HOME,
is_admin=False) -> user_creation_return_type:
org: Organization | None = await Organization.filter(Q(name=organization_name) & Q(name=organization_type)).first()
if not org:
org: Organization = await Organization.create(
name=organization_name,
type=organization_type
)
user: User | None = await User.filter(Q(email=email)).first()
if not user:
user: User = await User.create(
email=email,
username=username,
name=name,
surname=surname,
password=crypt.hash(password),
)
acl: ACL | None = await ACL.filter(Q(id="5f33facd-08dd-48a1-8f15-3b24f2a727f5")).first()
if not acl:
acl: ACL = await ACL.create(
id="5f33facd-08dd-48a1-8f15-3b24f2a727f5",
READ=True,
WRITE=True,
REPORT=True,
MANAGE=True if is_admin else False,
ADMIN=True if is_admin else False,
)
membership: Membership | None = await Membership.filter(Q(user=user, organization=org, acl=acl)).first()
if not membership:
await Membership.get_or_create(
organization=org,
user=user,
acl=acl
)
tokens: Token = await create_jwt_tokens(user=user)
return user, org, acl, tokens
return inner_function
@@ -1,86 +0,0 @@
from modules.organizations.models import Organization, OrganizationType
from modules.users.models import ACL, Membership, User
import pytest # type=ignore
from config import settings
crypt = settings.CRYPT
@pytest.fixture()
async def use_user_account():
org, _ = await Organization.get_or_create(
id="6ad4c94e-0522-4912-8d16-02d451f4c92d",
defaults={
"name": "User's Organization",
"type": OrganizationType.HOME,
},
)
acl, _ = await ACL.get_or_create(
id="a4e927a3-36e5-4761-badb-0a44ade6616f",
defaults={
"READ": True,
"WRITE": True,
"REPORT": True,
"MANAGE": False,
"ADMIN": False,
},
)
user, _ = await User.get_or_create(
id="24235427-9662-4ba3-a9c5-00000000000b",
defaults={
"email": "user@localhost.com",
"username": "user",
"name": "awesome",
"surname": "user",
"password": crypt.hash("userpassword"),
},
)
membership, _ = await Membership.get_or_create(
id="833b9511-b2da-4760-8fa4-1a5c7059911e",
defaults={
"organization": org,
"user": user,
"acl": acl,
},
)
return org, acl, user, membership
@pytest.fixture()
async def use_admin_account():
org, _ = await Organization.get_or_create(
id="de001f44-1bb8-4667-9f9d-2d62d6ad7270",
defaults={
"name": "Admin's Organization",
"type": OrganizationType.EXTRA_LARGE_ORGANIZATION,
},
)
acl, _ = await ACL.get_or_create(
id="83c1bfe6-c2ed-4ba1-be03-0e5c1960ec31",
defaults={
"READ": True,
"WRITE": True,
"REPORT": True,
"MANAGE": True,
"ADMIN": True,
},
)
user, _ = await User.get_or_create(
id="24235427-9662-4ba3-a9c5-00000000000a",
defaults={
"email": "admin@localhost.com",
"username": "admin",
"name": "awesome",
"surname": "admin",
"password": crypt.hash("adminpassword"),
},
)
membership, _ = await Membership.get_or_create(
id="393473ee-c218-4bcf-82cd-cb676c4d8a33",
defaults={
"organization": org,
"user": user,
"acl": acl,
},
)
return org, acl, user, membership
@@ -1,15 +1,12 @@
from modules.users.models import User
import pytest # type: ignore
from tests.base_test import Test
from httpx import AsyncClient
from config import settings
from unittest.mock import ANY
from tortoise.expressions import Q
crypt = settings.CRYPT
class TestAuthentication(object):
@pytest.mark.asyncio
class TestAuthentication(Test):
async def test_authentication_with_non_existing_user_and_password(
self, client: AsyncClient
):
@@ -24,11 +21,10 @@ class TestAuthentication(object):
assert response.status_code == 401
assert response.json() == {"detail": "E-Mail Address or password is incorrect"}
@pytest.mark.asyncio
async def test_authentication_with_existing_user_and_wrong_password(
self, client: AsyncClient, use_admin_account
self, client: AsyncClient, create_user_with_org
):
_, _, _, _ = use_admin_account
_, _, _, _ = await create_user_with_org()
response = await client.post(
"https://localhost/api/v1/auth/login",
data={
@@ -40,11 +36,10 @@ class TestAuthentication(object):
assert response.status_code == 401
assert response.json() == {"detail": "E-Mail Address or password is incorrect"}
@pytest.mark.asyncio
async def test_authentication_with_existing_user_and_password(
self, client: AsyncClient, use_admin_account
self, client: AsyncClient, create_user_with_org
):
_, _, admin, _ = use_admin_account
admin, _, _, _ = await create_user_with_org(email="admin@localhost.com", password="adminpassword")
response = await client.post(
"https://localhost/api/v1/auth/login",
data={
@@ -68,19 +63,19 @@ class TestAuthentication(object):
}
}
@pytest.mark.asyncio
async def test_logging_out_destroys_tokens(
self, client: AsyncClient, use_user_account
self, client: AsyncClient, create_user_with_org
):
_, _, user, _ = use_user_account
user, _, _, _ = await create_user_with_org(email="superuser@localhost.com", password="superuser")
response = await client.post(
"https://localhost/api/v1/auth/login",
data={
"username": "user@localhost.com",
"password": "userpassword",
"username": "superuser@localhost.com",
"password": "superuser",
"grant_type": "password",
},
)
print(response.json())
assert response.status_code == 200
assert response.json() == {
"jwt": {
@@ -115,11 +110,10 @@ class TestAuthentication(object):
"detail": "Refresh token not found or something went wrong."
}
@pytest.mark.asyncio
async def test_create_new_tokens_upon_refresh(
self, client: AsyncClient, use_admin_account
self, client: AsyncClient, create_user_with_org
):
_, _, admin, _ = use_admin_account
admin, _, _, _ = await create_user_with_org(email="admin@localhost.com", password="adminpassword")
token = await client.post(
"https://localhost/api/v1/auth/login",
data={
@@ -165,36 +159,3 @@ class TestAuthentication(object):
}
}
@pytest.mark.asyncio
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(
Q(email="superuser@localhost.com")
).get_or_none()
if check_if_account_exists:
await check_if_account_exists.delete(force=True)
account = await client.post(
"https://localhost/api/v1/auth/register",
json={
"email": "superuser@localhost.com",
"username": "superuser",
"name": "awesome",
"surname": "superuser",
"password": "superuserpassword",
"validate_password": "superuserpassword",
},
)
assert account.status_code == 201
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "superuser@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "superuser",
"username": "superuser",
}
@@ -0,0 +1,547 @@
from unittest.mock import ANY
from httpx import AsyncClient
from modules.users.models import ACL, Membership
from tests.base_test import Test
class TestInvitationalRoutes(Test):
async def test_send_invitations(self, client: AsyncClient, create_user_with_org):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin12@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user1231@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
async def test_cannot_see_others_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin99@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user18@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user1231@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
async def test_removing_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9999@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user9487@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user9487@localhost.com",
"sender": str(admin.id),
}
invite_id = invite.json()["id"]
removed_invite = await client.delete(
f"https://localhost/api/v1/invitations/{invite_id}",
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert removed_invite.status_code == 204
async def test_cannot_accept_own_invite(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin18569@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "non-existing-user@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "non-existing-user@localhost.com",
"sender": str(admin.id),
}
invite_id = invite.json()["id"]
try_accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert try_accept_invite.status_code == 403
assert try_accept_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_accept_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin191@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user8@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user8@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user8@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user8@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
# When an invite has been accepted, it should be removed.
assert user_invites.json() == []
async def test_decline_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin11@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user98@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user98@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user98@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user98@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/decline/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
async def test_prevent_accepting_when_declined_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9612@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user11918@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user11918@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user11918@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user11918@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/decline/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 403
assert accept_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_prevent_declining_when_accepted_sent_invitations(
self, client: AsyncClient, create_user_with_org
):
admin, org, _, admintokens = await create_user_with_org(
email="superadmin9712@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
_, _, _, usertokens = await create_user_with_org(email="user14918@localhost.com")
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "user14918@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 200
assert invite.json() == {
"accepted": False,
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"message": "Hi! We would like to invite you to our organization.",
"modified_at": ANY,
"org_id": str(org.id),
"receiver": "user14918@localhost.com",
"sender": str(admin.id),
}
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == [
{
"id": ANY,
"receiver": "user14918@localhost.com",
"sender": str(admin.id),
"org_id": str(org.id),
"message": "Hi! We would like to invite you to our organization.",
"accepted": False,
"disabled": False,
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
}
]
invite_id = user_invites.json()[0]["id"]
accept_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert accept_invite.status_code == 204
user_invites = await client.get(
"https://localhost/api/v1/invitations/",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert user_invites.status_code == 200
assert user_invites.json() == []
decline_invite = await client.get(
f"https://localhost/api/v1/invitations/accept/{invite_id}",
headers={"Authorization": f"Bearer {usertokens.access_token}"},
)
assert decline_invite.status_code == 403
assert decline_invite.json() == {
"detail": "The invitation doesn't exist or you don't have access to it."
}
async def test_prevent_adding_user_whom_already_belongs_to_your_organization(
self, client: AsyncClient, create_user_with_org
):
_, org, _, admintokens = await create_user_with_org(
email="us3r123@localhost.com",
username="awesomeadmin",
password="awesomeadmin",
is_admin=True,
)
user, _, _, _ = await create_user_with_org(email="us3r1234@localhost.com")
await Membership.create(
user=user,
organization=org,
acl=await ACL.create(READ=True)
)
invite = await client.post(
"https://localhost/api/v1/invitations/send",
json={
"org_id": str(org.id),
"receiver": "us3r1234@localhost.com",
"acl": None,
"message": "Hi! We would like to invite you to our organization.",
},
headers={"Authorization": f"Bearer {admintokens.access_token}"},
)
assert invite.status_code == 403
assert invite.json() == {
"detail": "The person you've invited is already part of the organization."
}
@@ -0,0 +1,284 @@
from httpx import AsyncClient
from modules.users.models import ACL, Membership
from modules.organizations.models import Organization
from unittest.mock import ANY
from tests.base_test import Test
class TestOrganizationRoute(Test):
async def test_get_organizations_from_api(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organizations = await client.get(
"https://localhost/api/v1/organizations/",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == [
{
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"id": ANY,
"modified_at": ANY,
"name": "simple organization",
"type": "home",
"street_name": None,
"zip_code": None,
"state": None,
"city": None,
"country": None,
},
]
async def test_create_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
async def test_delete_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
org_id = organizations.json()["id"]
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{org_id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 204
async def test_cannot_delete_organization_you_are_not_a_part_of(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{organization.id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 403
async def test_delete_membership_of_organization(
self, client: AsyncClient, create_user_with_org
):
user, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
acl: ACL = await ACL.create(
READ=True, WRITE=True, REPORT=True, MANAGE=False, ADMIN=False
)
await Membership.create(user=user, organization=organization, acl=acl)
deleted_org = await client.delete(
f"https://localhost/api/v1/organizations/{organization.id}",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert deleted_org.status_code == 204
async def test_update_organization(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
organizations = await client.post(
"https://localhost/api/v1/organizations/",
json={
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert organizations.status_code == 200
assert organizations.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My new organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
org_id = organizations.json()["id"]
update_org = await client.put(
f"https://localhost/api/v1/organizations/{org_id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.json() == {
"created_at": ANY,
"modified_at": ANY,
"disabled_at": None,
"id": ANY,
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
"disabled": False,
}
async def test_cannot_update_organization_you_are_not_a_part_of(
self, client: AsyncClient, create_user_with_org
):
_, _, _, tokens = await create_user_with_org()
organization: Organization = await Organization.create(
name="My Pretty Organization",
type="xl_org",
street_name="Alakaventie 5 A 188",
zip_code="00920",
state="uusimaa",
city="Helsinki",
country="Finland",
)
update_org = await client.put(
f"https://localhost/api/v1/organizations/{organization.id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.status_code == 403
assert update_org.json() == {
"detail": "It seems you are not part of the organization or are an admin of the said "
"organization.",
}
async def test_cannot_update_organization_you_are_not_an_admin_of(
self, client: AsyncClient, create_user_with_org
):
_, organization, _, tokens = await create_user_with_org()
update_org = await client.put(
f"https://localhost/api/v1/organizations/{organization.id}",
json={
"name": "My awesome organization",
"type": "xl_org",
"street_name": "Alakaventie 5 A 188",
"zip_code": "00920",
"state": "uusimaa",
"city": "Helsinki",
"country": "Finland",
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert update_org.status_code == 403
assert update_org.json() == {
"detail": "It seems you are not part of the organization or are an admin of the said "
"organization.",
}
@@ -0,0 +1,154 @@
from tests.base_test import Test
from tortoise.expressions import Q
from tests.base_test import Test
from httpx import AsyncClient
from unittest.mock import ANY
from modules.users.models import User
class TestAccounts(Test):
async def test_setup_new_account(self, client: AsyncClient):
# Ensure account is never available. Prevents account already being available.
check_if_account_exists: User | None = await User.filter(
Q(email="superuser@localhost.com")
).get_or_none()
if check_if_account_exists:
await check_if_account_exists.delete(force=True)
account = await client.post(
"https://localhost/api/v1/users/",
json={
"email": "superuser@localhost.com",
"username": "superuser",
"name": "awesome",
"surname": "superuser",
"password": "superuserpassword",
"validate_password": "superuserpassword",
},
)
assert account.status_code == 201
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "superuser@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "superuser",
"username": "superuser",
}
async def test_me_route(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
async def test_update_me_route(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org()
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
account = await client.put(
"https://localhost/api/v1/users/me",
json={
"email": None,
"name": None,
"surname": "bluey",
"old_password": None,
"password": None,
"validate_password": None,
},
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 204
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "user@localhost.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "bluey",
"username": "user",
}
async def test_remove_account(self, client: AsyncClient, create_user_with_org):
_, _, _, tokens = await create_user_with_org(email="sup3rus3r@gmail.com")
account = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert account.status_code == 200
assert account.json() == {
"created_at": ANY,
"disabled": False,
"disabled_at": None,
"email": "sup3rus3r@gmail.com",
"id": ANY,
"modified_at": ANY,
"name": "awesome",
"surname": "user",
"username": "user",
}
delete = await client.delete(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert delete.status_code == 204
old = await client.get(
"https://localhost/api/v1/users/me",
headers={"Authorization": f"Bearer {tokens.access_token}"},
)
assert old.status_code == 401
assert old.json() == {
"detail": "The requested token does not exist or you are not logged in.",
}