from fastapi.testclient import TestClient from pathlib import Path import tempfile from typing import Dict, List, Optional import app.container as container from app.core.audit import AuditLogger from app.core.config import Settings from app.core.errors import ApiError from app.factory import create_app from app.core.models import GroupSummary, UserSummary from app.providers.base import SystemProvider from app.services.user_group_service import UserGroupService from app.state import AppState class MockProvider(SystemProvider): def __init__(self): self.users: Dict[str, UserSummary] = {} self.groups: Dict[str, GroupSummary] = {} self.user_group_map: Dict[str, List[str]] = {} self.environments: Dict[str, str] = {} self.default_environments: Dict[str, str] = {} self.environment_failures: Dict[str, ApiError] = {} def create_user(self, username: str, password_hash: str, home_dir: Optional[str], linked_home_dir: Optional[str], shell: str, primary_group: Optional[str], groups: List[str]) -> None: if username in self.users: raise ApiError(409, "resource_conflict", "user exists") self.users[username] = UserSummary(username=username, uid=1000, gid=1000, home_dir=home_dir or f"/home/{username}", shell=shell) self.user_group_map[username] = groups def delete_user(self, username: str) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") del self.users[username] self.user_group_map.pop(username, None) def change_user_password(self, username: str, password_hash: str) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") def list_users(self) -> List[UserSummary]: return list(self.users.values()) def get_user(self, username: str) -> UserSummary: if username not in self.users: raise ApiError(404, "not_found", "user not found") return self.users[username] def create_group(self, groupname: str) -> None: if groupname in self.groups: raise ApiError(409, "resource_conflict", "group exists") self.groups[groupname] = GroupSummary(groupname=groupname, gid=1000, members=[]) def delete_group(self, groupname: str) -> None: if groupname not in self.groups: raise ApiError(404, "not_found", "group not found") del self.groups[groupname] def list_groups(self) -> List[GroupSummary]: return list(self.groups.values()) def get_group(self, groupname: str) -> GroupSummary: if groupname not in self.groups: raise ApiError(404, "not_found", "group not found") return self.groups[groupname] def add_user_groups(self, username: str, groups: List[str], replace: bool) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") if replace: self.user_group_map[username] = groups else: self.user_group_map[username] = list(dict.fromkeys(self.user_group_map.get(username, []) + groups)) def remove_user_groups(self, username: str, groups: List[str]) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") self.user_group_map[username] = [item for item in self.user_group_map.get(username, []) if item not in set(groups)] def get_user_groups(self, username: str) -> List[str]: if username not in self.users: raise ApiError(404, "not_found", "user not found") return self.user_group_map.get(username, []) def read_user_environment(self, username: str) -> str: if username not in self.users: raise ApiError(404, "not_found", "user not found") return self.environments.get(username, "") def write_default_user_environment(self, username: str, content: str) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") self.default_environments[username] = content def write_managed_user_environment(self, username: str, content: str) -> None: if username not in self.users: raise ApiError(404, "not_found", "user not found") if username in self.environment_failures: raise self.environment_failures[username] self.environments[username] = content def build_client( link_home_dir: Optional[str] = None, hidden_users: Optional[List[str]] = None, hidden_groups: Optional[List[str]] = None, whitelist_users: Optional[List[str]] = None, whitelist_groups: Optional[List[str]] = None, locked_users: Optional[List[str]] = None, user_uid_min: Optional[int] = None, user_uid_max: Optional[int] = None, group_gid_min: Optional[int] = None, group_gid_max: Optional[int] = None, ) -> TestClient: settings = Settings(TOKEN="test-token") service = UserGroupService( provider=MockProvider(), home_base_dir="/home", link_home_dir=link_home_dir, hidden_users=hidden_users, hidden_groups=hidden_groups, whitelist_users=whitelist_users, whitelist_groups=whitelist_groups, locked_users=locked_users, user_uid_min=user_uid_min, user_uid_max=user_uid_max, group_gid_min=group_gid_min, group_gid_max=group_gid_max, ) audit = AuditLogger(str(Path(tempfile.gettempdir()) / "bastion_sso_user_manage_api_test_audit.log")) container.app_state = AppState(settings=settings, service=service, audit=audit) return TestClient(create_app()) def test_user_group_happy_path() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} response = client.post("/groups", json={"groupname": "dev"}, headers=headers) assert response.status_code == 200 response = client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": ["dev"]}, headers=headers) assert response.status_code == 200 response = client.post("/users/alice/groups", json={"groups": ["dev"], "mode": "append"}, headers=headers) assert response.status_code == 200 response = client.get("/users/alice/groups", headers=headers) assert response.status_code == 200 assert "dev" in response.json()["groups"] def test_health_returns_server_name_and_online_status() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} response = client.get("/health", headers=headers) assert response.status_code == 200 assert response.json() == {"server_name": "user-manage-api", "status": "online"} def test_change_user_password() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) response = client.patch("/users/alice/password", json={"password_hash": "$6$rounds=5000$newhashvalue"}, headers=headers) assert response.status_code == 200 assert response.json()["message"] == "User password updated." def test_user_environment_endpoints() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": [], "default_environment_variables": "export A=1"}, headers=headers) provider = container.app_state.service.provider assert provider.default_environments["alice"] == "export A=1" response = client.put("/users/alice/environment", json={"content": "export B=2"}, headers=headers) assert response.status_code == 200 response = client.get("/users/alice/environment", headers=headers) assert response.status_code == 200 assert response.json()["content"] == "export B=2" def test_update_all_user_environments_updates_visible_users() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) client.post("/users", json={"username": "bob", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) response = client.put("/users/environment", json={"content": "export SHARED=1"}, headers=headers) assert response.status_code == 200 assert response.json()["updated_count"] == 2 provider = container.app_state.service.provider assert provider.environments["alice"] == "export SHARED=1" assert provider.environments["bob"] == "export SHARED=1" def test_update_all_user_environments_returns_partial_failures() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) client.post("/users", json={"username": "bob", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) provider = container.app_state.service.provider provider.environment_failures["bob"] = ApiError(503, "system_permission_denied", "permission denied") response = client.put("/users/environment", json={"content": "export SHARED=1"}, headers=headers) assert response.status_code == 200 assert response.json()["updated_users"] == ["alice"] assert response.json()["failed_count"] == 1 assert response.json()["failed_users"][0]["username"] == "bob" def test_create_user_rejects_client_shell_and_home_dir() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} payload = { "username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "shell": "/bin/sh", "home_dir": "/tmp/alice", } response = client.post("/users", json=payload, headers=headers) assert response.status_code == 400 assert response.json()["code"] == "invalid_parameter" def test_locked_user_can_be_read_but_not_modified() -> None: client = build_client(locked_users=["alice"]) headers = {"Authorization": "Bearer test-token"} provider = container.app_state.service.provider provider.users["alice"] = UserSummary(username="alice", uid=1000, gid=1000, home_dir="/home/alice", shell="/bin/bash") response = client.get("/users/alice", headers=headers) assert response.status_code == 200 response = client.patch("/users/alice/password", json={"password_hash": "$6$rounds=5000$newhashvalue"}, headers=headers) assert response.status_code == 423 response = client.delete("/users/alice", headers=headers) assert response.status_code == 423 response = client.post("/users/alice/groups", json={"groups": ["dev"], "mode": "append"}, headers=headers) assert response.status_code == 423 def test_conflict_and_not_found_codes() -> None: client = build_client() headers = {"Authorization": "Bearer test-token"} payload = {"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []} response = client.post("/users", json=payload, headers=headers) assert response.status_code == 200 response = client.post("/users", json=payload, headers=headers) assert response.status_code == 409 response = client.delete("/users/bob", headers=headers) assert response.status_code == 404 def test_hidden_users_and_groups_are_filtered_and_blocked() -> None: client = build_client(hidden_users=["root"], hidden_groups=["sudo"]) headers = {"Authorization": "Bearer test-token"} client.post("/users", json={"username": "alice", "password_hash": "$6$rounds=5000$abcdefghij", "groups": []}, headers=headers) client.post("/groups", json={"groupname": "dev"}, headers=headers) provider = container.app_state.service.provider provider.users["root"] = UserSummary(username="root", uid=0, gid=0, home_dir="/root", shell="/bin/bash") provider.groups["sudo"] = GroupSummary(groupname="sudo", gid=27, members=["root"]) provider.user_group_map["alice"] = ["dev", "sudo"] response = client.get("/users", headers=headers) assert response.status_code == 200 assert [user["username"] for user in response.json()] == ["alice"] response = client.get("/groups", headers=headers) assert response.status_code == 200 assert [group["groupname"] for group in response.json()] == ["dev"] response = client.get("/users/root", headers=headers) assert response.status_code == 404 response = client.delete("/groups/sudo", headers=headers) assert response.status_code == 404 response = client.get("/users/alice/groups", headers=headers) assert response.status_code == 200 assert response.json()["groups"] == ["dev"] def test_whitelist_overrides_hidden_and_id_ranges() -> None: client = build_client( hidden_users=["root"], hidden_groups=["sudo"], whitelist_users=["root"], whitelist_groups=["sudo"], user_uid_min=1000, user_uid_max=2000, group_gid_min=1000, group_gid_max=2000, ) headers = {"Authorization": "Bearer test-token"} provider = container.app_state.service.provider provider.users["root"] = UserSummary(username="root", uid=0, gid=0, home_dir="/root", shell="/bin/bash") provider.users["alice"] = UserSummary(username="alice", uid=1000, gid=1000, home_dir="/home/alice", shell="/bin/bash") provider.users["system"] = UserSummary(username="system", uid=500, gid=500, home_dir="/home/system", shell="/bin/bash") provider.groups["sudo"] = GroupSummary(groupname="sudo", gid=27, members=["root"]) provider.groups["dev"] = GroupSummary(groupname="dev", gid=1000, members=[]) provider.groups["system"] = GroupSummary(groupname="system", gid=500, members=[]) response = client.get("/users", headers=headers) assert response.status_code == 200 assert [user["username"] for user in response.json()] == ["root", "alice"] response = client.get("/groups", headers=headers) assert response.status_code == 200 assert [group["groupname"] for group in response.json()] == ["sudo", "dev"] response = client.get("/users/root", headers=headers) assert response.status_code == 200 response = client.get("/users/system", headers=headers) assert response.status_code == 404