Fix last issues regarding testing, fix generating user and admin accounts
This commit is contained in:
@@ -2,7 +2,6 @@ from pydantic_settings import BaseSettings, SettingsConfigDict # type: ignore
|
|||||||
from passlib.context import CryptContext # type: ignore
|
from passlib.context import CryptContext # type: ignore
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseSettings):
|
class Settings(BaseSettings):
|
||||||
PROJECT_NAME: str = "StoneEdge Asset Management System"
|
PROJECT_NAME: str = "StoneEdge Asset Management System"
|
||||||
PROJECT_VERSION: str = "0.0.1"
|
PROJECT_VERSION: str = "0.0.1"
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from tortoise import Tortoise
|
|
||||||
from config import settings
|
from config import settings
|
||||||
from database import end_connections_to_db, migrate_db
|
from database import end_connections_to_db, migrate_db
|
||||||
from responses import msgspec_jsonresponse
|
from responses import msgspec_jsonresponse
|
||||||
|
|||||||
@@ -23,12 +23,16 @@ crypt = settings.CRYPT
|
|||||||
@router.post("/")
|
@router.post("/")
|
||||||
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
async def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
||||||
user: User | None = await User.filter(email=form.username).first()
|
user: User | None = await User.filter(email=form.username).first()
|
||||||
|
|
||||||
if user is None:
|
if user is None:
|
||||||
raise HTTPException(status_code=401, detail=error)
|
raise HTTPException(status_code=401, detail=error)
|
||||||
|
|
||||||
if user.check_against_password(form.password) is False:
|
if user.check_against_password(form.password) is False:
|
||||||
raise HTTPException(status_code=401, detail=error)
|
raise HTTPException(status_code=401, detail=error)
|
||||||
|
|
||||||
|
if user.disabled is True:
|
||||||
|
raise HTTPException(status_code=401, detail=error)
|
||||||
|
|
||||||
auth_token = create_token(
|
auth_token = create_token(
|
||||||
user_id=user.id, offset=timedelta(settings.ACCESS_TOKEN_EXPIRE_MIN)
|
user_id=user.id, offset=timedelta(settings.ACCESS_TOKEN_EXPIRE_MIN)
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from datetime import timedelta
|
|||||||
import uuid, time
|
import uuid, time
|
||||||
from config import settings
|
from config import settings
|
||||||
from joserfc import jwt # type: ignore
|
from joserfc import jwt # type: ignore
|
||||||
|
from joserfc.jwk import OctKey # type: ignore
|
||||||
|
|
||||||
crypt = settings.CRYPT
|
crypt = settings.CRYPT
|
||||||
|
|
||||||
@@ -22,7 +23,7 @@ def create_token(user_id: uuid, offset: timedelta) -> str:
|
|||||||
"iat": curr_time,
|
"iat": curr_time,
|
||||||
"exp": int(time.time() + offset.total_seconds()),
|
"exp": int(time.time() + offset.total_seconds()),
|
||||||
},
|
},
|
||||||
settings.SECRET_KEY,
|
OctKey.import_key(settings.SECRET_KEY),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ class EnumField(fields.CharField):
|
|||||||
raise ConfigurationError("{} is not a subclass of Enum!".format(enum_type))
|
raise ConfigurationError("{} is not a subclass of Enum!".format(enum_type))
|
||||||
self._enum_type = enum_type
|
self._enum_type = enum_type
|
||||||
|
|
||||||
def to_db_value(self, value: Enum, instance) -> str:
|
def to_db_value(self, value: Enum, _) -> str:
|
||||||
return value.value
|
return value.value
|
||||||
|
|
||||||
def to_python_value(self, value: str) -> Enum:
|
def to_python_value(self, value: str) -> Enum:
|
||||||
|
|||||||
+62
-38
@@ -1,48 +1,72 @@
|
|||||||
import uuid
|
from modules.organizations.models import Organization, OrganizationType
|
||||||
from modules.organizations.models import Organization
|
|
||||||
from modules.users.models import ACL, Membership, User
|
from modules.users.models import ACL, Membership, User
|
||||||
import pytest # type: ignore
|
import pytest # type: ignore
|
||||||
from config import settings
|
from config import settings
|
||||||
|
|
||||||
crypt = settings.CRYPT
|
crypt = settings.CRYPT
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
async def use_user_account():
|
async def use_user_account():
|
||||||
org = await Organization.create(name="User's Organization", type="home")
|
org, _ = await Organization.get_or_create(
|
||||||
acl = await ACL.create(
|
id="6ad4c94e-0522-4912-8d16-02d451f4c92d",
|
||||||
READ=True, WRITE=True, REPORT=True, MANAGE=False, ADMIN=False
|
name="User's Organization",
|
||||||
)
|
type=OrganizationType.HOME,
|
||||||
user = await User.create(
|
)
|
||||||
email="user@localhost.com",
|
acl, _ = await ACL.get_or_create(
|
||||||
username="user",
|
id="a4e927a3-36e5-4761-badb-0a44ade6616f",
|
||||||
name="awesome",
|
READ=True,
|
||||||
surname="user",
|
WRITE=True,
|
||||||
password=crypt.hash("userpassword"),
|
REPORT=True,
|
||||||
)
|
MANAGE=False,
|
||||||
membership = await Membership.create(
|
ADMIN=False,
|
||||||
organization=org,
|
)
|
||||||
user=user,
|
user, _ = await User.get_or_create(
|
||||||
acl=acl,
|
id="24235427-9662-4ba3-a9c5-00000000000b",
|
||||||
)
|
email="user@localhost.com",
|
||||||
return org, acl, user, membership
|
username="user",
|
||||||
|
name="awesome",
|
||||||
|
surname="user",
|
||||||
|
password=crypt.hash("userpassword"),
|
||||||
|
)
|
||||||
|
membership, _ = await Membership.get_or_create(
|
||||||
|
id="833b9511-b2da-4760-8fa4-1a5c7059911e",
|
||||||
|
organization=org,
|
||||||
|
user=user,
|
||||||
|
acl=acl,
|
||||||
|
)
|
||||||
|
return org, acl, user, membership
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
async def use_admin_account():
|
async def use_admin_account():
|
||||||
org = await Organization.create(name="Admin's Organization", type="home")
|
org, _ = await Organization.get_or_create(
|
||||||
acl = await ACL.create(
|
id="de001f44-1bb8-4667-9f9d-2d62d6ad7270",
|
||||||
READ=True, WRITE=True, REPORT=True, MANAGE=True, ADMIN=True
|
name="Admin's Organization",
|
||||||
)
|
type=OrganizationType.EXTRA_LARGE_ORGANIZATION,
|
||||||
user = await User.create(
|
)
|
||||||
email="admin@localhost.com",
|
acl, _ = await ACL.get_or_create(
|
||||||
username="admin",
|
id="83c1bfe6-c2ed-4ba1-be03-0e5c1960ec31",
|
||||||
name="awesome",
|
READ=True,
|
||||||
surname="admin",
|
WRITE=True,
|
||||||
password=crypt.hash("adminpassword"),
|
REPORT=True,
|
||||||
)
|
MANAGE=True,
|
||||||
membership = await Membership.create(
|
ADMIN=True,
|
||||||
organization=org,
|
)
|
||||||
user=user,
|
user, _ = await User.get_or_create(
|
||||||
acl=acl,
|
defaults={
|
||||||
)
|
"id": "24235427-9662-4ba3-a9c5-00000000000a",
|
||||||
return org, acl, user, membership
|
"email": "admin@localhost.com",
|
||||||
|
"username": "admin",
|
||||||
|
"name": "awesome",
|
||||||
|
"surname": "admin",
|
||||||
|
"password": crypt.hash("adminpassword"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
membership, _ = await Membership.get_or_create(
|
||||||
|
id="393473ee-c218-4bcf-82cd-cb676c4d8a33",
|
||||||
|
organization=org,
|
||||||
|
user=user,
|
||||||
|
acl=acl,
|
||||||
|
)
|
||||||
|
return org, acl, user, membership
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ class TestAuthentication(object):
|
|||||||
async def test_authentication_with_existing_user_and_wrong_password(
|
async def test_authentication_with_existing_user_and_wrong_password(
|
||||||
self, client: AsyncClient, use_admin_account
|
self, client: AsyncClient, use_admin_account
|
||||||
):
|
):
|
||||||
|
_, _, _, _ = use_admin_account
|
||||||
response = await client.post(
|
response = await client.post(
|
||||||
"http://localhost/api/v1/auth/",
|
"http://localhost/api/v1/auth/",
|
||||||
data={
|
data={
|
||||||
@@ -41,7 +42,7 @@ class TestAuthentication(object):
|
|||||||
async def test_authentication_with_existing_user_and_password(
|
async def test_authentication_with_existing_user_and_password(
|
||||||
self, client: AsyncClient, use_admin_account
|
self, client: AsyncClient, use_admin_account
|
||||||
):
|
):
|
||||||
_, _, user, _ = use_admin_account
|
_, _, admin, _ = use_admin_account
|
||||||
response = await client.post(
|
response = await client.post(
|
||||||
"http://localhost/api/v1/auth/",
|
"http://localhost/api/v1/auth/",
|
||||||
data={
|
data={
|
||||||
@@ -54,7 +55,7 @@ class TestAuthentication(object):
|
|||||||
assert response.json() == {
|
assert response.json() == {
|
||||||
"jwt": {
|
"jwt": {
|
||||||
"created_at": ANY,
|
"created_at": ANY,
|
||||||
"user_id": str(user.id),
|
"user_id": str(admin.id),
|
||||||
"id": ANY,
|
"id": ANY,
|
||||||
"modified_at": ANY,
|
"modified_at": ANY,
|
||||||
"disabled_at": None,
|
"disabled_at": None,
|
||||||
|
|||||||
Reference in New Issue
Block a user