from typing import List, Optional from app.core.errors import ApiError, map_command_error from app.core.models import UserCreateRequest import app.providers.cli_provider as cli_provider from app.providers.cli_provider import CliSystemProvider from app.providers.base import SystemProvider from app.services.user_group_service import UserGroupService class NoopProvider(SystemProvider): def __init__(self) -> None: self.created_home_dir: Optional[str] = None self.created_linked_home_dir: Optional[str] = None self.created_shell: Optional[str] = None self.default_environment_content: Optional[str] = None def create_user(self, username: str, password_hash: str, home_dir: Optional[str], linked_home_dir: Optional[str], shell: str, primary_group: Optional[str], groups: List[str]) -> None: self.created_home_dir = home_dir self.created_linked_home_dir = linked_home_dir self.created_shell = shell return None def delete_user(self, username: str) -> None: return None def change_user_password(self, username: str, password_hash: str) -> None: return None def list_users(self): return [] def get_user(self, username: str): raise ApiError(404, "not_found", "not found") def create_group(self, groupname: str) -> None: return None def delete_group(self, groupname: str) -> None: return None def list_groups(self): return [] def get_group(self, groupname: str): raise ApiError(404, "not_found", "not found") def add_user_groups(self, username: str, groups: List[str], replace: bool) -> None: return None def remove_user_groups(self, username: str, groups: List[str]) -> None: return None def get_user_groups(self, username: str): return [] def read_user_environment(self, username: str) -> str: return "" def write_default_user_environment(self, username: str, content: str) -> None: self.default_environment_content = content def write_managed_user_environment(self, username: str, content: str) -> None: return None def test_create_user_uses_configured_shell_and_home_dir() -> None: provider = NoopProvider() service = UserGroupService(provider=provider, home_base_dir="/srv/home", default_shell="/usr/sbin/nologin") payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[]) service.create_user(payload) assert provider.created_home_dir == "/srv/home/alice" assert provider.created_shell == "/usr/sbin/nologin" def test_link_home_dir_uses_home_base_symlink_path_and_external_storage() -> None: provider = NoopProvider() service = UserGroupService(provider=provider, home_base_dir="/home", link_home_dir="/data/home") payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[]) service.create_user(payload) assert provider.created_home_dir == "/home/alice" assert provider.created_linked_home_dir == "/data/home/alice" def test_create_user_writes_default_environment_without_managed_block() -> None: provider = NoopProvider() service = UserGroupService(provider=provider, home_base_dir="/srv/home") payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[], default_environment_variables="export A=1\n") service.create_user(payload) assert provider.default_environment_content == "export A=1\n" def test_command_error_mapping_conflict() -> None: error = map_command_error("user already exists", 9) assert error.status_code == 409 class RecordingExecutor: def __init__(self, home_dir: str) -> None: self.home_dir = home_dir self.commands: List[List[str]] = [] self.files: dict[str, str] = {} def run(self, args: List[str], use_sudo: bool = True, strip_output: bool = True, allowed_exit_codes=None) -> str: self.commands.append(args) if args == ["getent", "passwd", "alice"]: return f"alice:x:1000:1000::%s:/bin/bash" % self.home_dir if args == ["cat", f"{self.home_dir}/.bashrc"]: if "bashrc" not in self.files: raise ApiError(404, "not_found", "No such file or directory") return self.files["bashrc"] if args[:1] == ["install"]: target = args[-1] source = args[-2] with open(source, encoding="utf-8") as source_file: self.files[target] = source_file.read() return "" class MissingUserExecutor: def run(self, args: List[str], use_sudo: bool = True, strip_output: bool = True, allowed_exit_codes=None) -> str: if args == ["getent", "passwd", "missing"] and allowed_exit_codes == {2}: return "" raise AssertionError(f"unexpected command: {args}") class FakePath: def __init__(self, path: str) -> None: self.path = path def is_symlink(self) -> bool: return self.path == "/home/alice" def test_delete_user_unlinks_home_when_it_is_symlink(monkeypatch) -> None: monkeypatch.setattr(cli_provider, "Path", FakePath) executor = RecordingExecutor("/home/alice") provider = CliSystemProvider(executor=executor) provider.delete_user("alice") assert ["userdel", "alice"] in executor.commands assert ["unlink", "/home/alice"] in executor.commands def test_get_user_treats_getent_exit_code_two_as_not_found() -> None: provider = CliSystemProvider(executor=MissingUserExecutor()) try: provider.get_user("missing") except ApiError as exception: assert exception.status_code == 404 assert exception.code == "not_found" else: raise AssertionError("expected ApiError") def test_delete_user_does_not_unlink_home_when_it_is_directory(monkeypatch) -> None: monkeypatch.setattr(cli_provider, "Path", FakePath) executor = RecordingExecutor("/home/old-alice") provider = CliSystemProvider(executor=executor) provider.delete_user("alice") assert ["userdel", "alice"] in executor.commands assert ["unlink", "/home/old-alice"] not in executor.commands def test_managed_environment_block_replaces_existing_block() -> None: provider = CliSystemProvider(executor=RecordingExecutor("/home/alice")) current = "before\n\n# >>> BastionSSO environment >>>\nold=1\n# <<< BastionSSO environment <<<\n\nafter\n" result = provider._replace_managed_block(current, "new=2") assert "before" in result assert "after" in result assert "old=1" not in result assert "new=2" in result assert "# >>> BastionSSO environment >>>" in result assert "# <<< BastionSSO environment <<<" in result def test_read_user_environment_returns_empty_when_bashrc_missing() -> None: provider = CliSystemProvider(executor=RecordingExecutor("/home/alice")) assert provider.read_user_environment("alice") == "" def test_write_user_environment_uses_install_for_bashrc_only() -> None: executor = RecordingExecutor("/home/alice") provider = CliSystemProvider(executor=executor) provider.write_default_user_environment("alice", "export A=1\n") assert ["mkdir", "-p", "/home/alice"] in executor.commands install_command = next(command for command in executor.commands if command[:7] == ["install", "-m", "644", "-o", "alice", "-g", "1000"]) assert install_command[-1] == "/home/alice/.bashrc" assert not any(command[:3] == ["chown", "-R", "alice"] for command in executor.commands) def test_create_user_writes_bash_profile_that_loads_bashrc() -> None: executor = RecordingExecutor("/home/alice") provider = CliSystemProvider(executor=executor) provider.create_user( username="alice", password_hash="$6$rounds=5000$abcdefghij", home_dir="/home/alice", linked_home_dir=None, shell="/bin/bash", primary_group=None, groups=[], ) assert ["useradd", "-s", "/bin/bash", "-p", "$6$rounds=5000$abcdefghij", "-m", "-d", "/home/alice", "alice"] in executor.commands install_command = next(command for command in executor.commands if command[:7] == ["install", "-m", "644", "-o", "alice", "-g", "1000"]) assert install_command[-1] == "/home/alice/.bash_profile" assert executor.files["/home/alice/.bash_profile"] == "if [ -f ~/.bashrc ]; then\n . ~/.bashrc\nfi\n" def test_all_user_environments_collects_partial_failures() -> None: class PartialFailureProvider(NoopProvider): def list_users(self): from app.core.models import UserSummary return [ UserSummary(username="alice", uid=1000, gid=1000, home_dir="/home/alice", shell="/bin/bash"), UserSummary(username="bob", uid=1001, gid=1001, home_dir="/home/bob", shell="/bin/bash"), ] def get_user(self, username: str): from app.core.models import UserSummary return UserSummary(username=username, uid=1000, gid=1000, home_dir=f"/home/{username}", shell="/bin/bash") def write_managed_user_environment(self, username: str, content: str) -> None: if username == "bob": raise ApiError(503, "system_permission_denied", "permission denied") service = UserGroupService(provider=PartialFailureProvider(), home_base_dir="/home") result = service.set_all_user_environments("export A=1") assert result.updated_users == ["alice"] assert result.failed_count == 1 assert result.failed_users[0].username == "bob" def test_command_error_mapping_includes_exit_code_when_stderr_empty() -> None: error = map_command_error("", 7) assert error.message == "system command failed with exit code 7."