from pathlib import PurePosixPath from typing import List, Optional from app.core.errors import ApiError from app.core.models import GroupSummary, UserCreateRequest, UserSummary from app.providers.base import SystemProvider class UserGroupService: def __init__( self, provider: SystemProvider, home_base_dir: str, link_home_dir: Optional[str] = None, hidden_users: Optional[List[str]] = None, hidden_groups: Optional[List[str]] = None, whitelist_users: Optional[List[str]] = None, whitelist_groups: Optional[List[str]] = None, locked_users: Optional[List[str]] = None, user_uid_min: Optional[int] = None, user_uid_max: Optional[int] = None, group_gid_min: Optional[int] = None, group_gid_max: Optional[int] = None, ): self.provider = provider self.home_base_dir = PurePosixPath(home_base_dir) self.link_home_base_dir = PurePosixPath(link_home_dir) if link_home_dir else None self.hidden_users = set(hidden_users or []) self.hidden_groups = set(hidden_groups or []) self.whitelist_users = set(whitelist_users or []) self.whitelist_groups = set(whitelist_groups or []) self.locked_users = set(locked_users or []) self.user_uid_min = user_uid_min self.user_uid_max = user_uid_max self.group_gid_min = group_gid_min self.group_gid_max = group_gid_max def _ensure_user_visible(self, username: str) -> None: user = self.provider.get_user(username) if not self._is_user_visible(user): raise ApiError(404, "not_found", "user not found") def _ensure_groups_visible(self, groups: List[str]) -> None: for groupname in groups: self._ensure_group_visible(groupname) def _ensure_group_visible(self, groupname: str) -> None: group = self.provider.get_group(groupname) if not self._is_group_visible(group): raise ApiError(404, "not_found", "group not found") def _ensure_user_name_allowed(self, username: str) -> None: if username not in self.whitelist_users and username in self.hidden_users: raise ApiError(404, "not_found", "user not found") def _ensure_user_unlocked(self, username: str) -> None: if username in self.locked_users: raise ApiError(423, "user_locked", "user is locked and cannot be modified") def _ensure_group_name_allowed(self, groupname: str) -> None: if groupname not in self.whitelist_groups and groupname in self.hidden_groups: raise ApiError(404, "not_found", "group not found") def _is_uid_in_range(self, uid: int) -> bool: if self.user_uid_min is not None and uid < self.user_uid_min: return False if self.user_uid_max is not None and uid > self.user_uid_max: return False return True def _is_gid_in_range(self, gid: int) -> bool: if self.group_gid_min is not None and gid < self.group_gid_min: return False if self.group_gid_max is not None and gid > self.group_gid_max: return False return True def _is_user_visible(self, user: UserSummary) -> bool: if user.username in self.whitelist_users: return True if user.username in self.hidden_users: return False return self._is_uid_in_range(user.uid) def _is_group_visible(self, group: GroupSummary) -> bool: if group.groupname in self.whitelist_groups: return True if group.groupname in self.hidden_groups: return False return self._is_gid_in_range(group.gid) def _resolve_home_dir(self, home_dir: Optional[str], username: str) -> str: if home_dir is None: return str(self.home_base_dir / username) return self._validate_home_dir(home_dir, username) def _resolve_linked_home_dir(self, username: str) -> Optional[str]: if self.link_home_base_dir is None: return None return str(self.link_home_base_dir / username) def _validate_home_dir(self, home_dir: str, username: str) -> str: if home_dir is None: return str(self.home_base_dir / username) path = PurePosixPath(home_dir) base = self.home_base_dir if not str(path).startswith(str(base) + "/") and path != base / username: raise ApiError(400, "invalid_home_dir", "home_dir must be inside HOME_BASE_DIR.") return str(path) def create_user(self, payload: UserCreateRequest) -> None: self._ensure_user_name_allowed(payload.username) self._ensure_user_unlocked(payload.username) if payload.primary_group is not None: self._ensure_group_visible(payload.primary_group) self._ensure_groups_visible(payload.groups) home_dir = self._resolve_home_dir(payload.home_dir, payload.username) linked_home_dir = self._resolve_linked_home_dir(payload.username) self.provider.create_user( username=payload.username, password_hash=payload.password_hash, home_dir=home_dir, linked_home_dir=linked_home_dir, shell=payload.shell, primary_group=payload.primary_group, groups=payload.groups, ) def delete_user(self, username: str) -> None: self._ensure_user_visible(username) self._ensure_user_unlocked(username) self.provider.delete_user(username) def change_user_password(self, username: str, password_hash: str) -> None: self._ensure_user_visible(username) self._ensure_user_unlocked(username) self.provider.change_user_password(username, password_hash) def list_users(self) -> List[UserSummary]: return [user for user in self.provider.list_users() if self._is_user_visible(user)] def get_user(self, username: str) -> UserSummary: user = self.provider.get_user(username) if not self._is_user_visible(user): raise ApiError(404, "not_found", "user not found") return user def create_group(self, groupname: str) -> None: self._ensure_group_name_allowed(groupname) self.provider.create_group(groupname) def delete_group(self, groupname: str) -> None: self._ensure_group_visible(groupname) group = self.provider.get_group(groupname) if group.members: raise ApiError(422, "precondition_failed", "Group has members and cannot be deleted.") self.provider.delete_group(groupname) def list_groups(self) -> List[GroupSummary]: return [group for group in self.provider.list_groups() if self._is_group_visible(group)] def get_group(self, groupname: str) -> GroupSummary: group = self.provider.get_group(groupname) if not self._is_group_visible(group): raise ApiError(404, "not_found", "group not found") return group def add_user_groups(self, username: str, groups: List[str], replace: bool) -> None: self._ensure_user_visible(username) self._ensure_user_unlocked(username) self._ensure_groups_visible(groups) self.provider.add_user_groups(username, groups, replace) def remove_user_groups(self, username: str, groups: List[str]) -> None: self._ensure_user_visible(username) self._ensure_user_unlocked(username) self._ensure_groups_visible(groups) self.provider.remove_user_groups(username, groups) def get_user_groups(self, username: str) -> List[str]: self._ensure_user_visible(username) return [group for group in self.provider.get_user_groups(username) if self._is_group_visible(self.provider.get_group(group))]