BastionSSO/user_manage_api/tests/test_service_unit.py

223 lines
8.5 KiB
Python

from typing import List, Optional
from app.core.errors import ApiError, map_command_error
from app.core.models import UserCreateRequest
import app.providers.cli_provider as cli_provider
from app.providers.cli_provider import CliSystemProvider
from app.providers.base import SystemProvider
from app.services.user_group_service import UserGroupService
class NoopProvider(SystemProvider):
def __init__(self) -> None:
self.created_home_dir: Optional[str] = None
self.created_linked_home_dir: Optional[str] = None
self.created_shell: Optional[str] = None
self.default_environment_content: Optional[str] = None
def create_user(self, username: str, password_hash: str, home_dir: Optional[str], linked_home_dir: Optional[str], shell: str, primary_group: Optional[str], groups: List[str]) -> None:
self.created_home_dir = home_dir
self.created_linked_home_dir = linked_home_dir
self.created_shell = shell
return None
def delete_user(self, username: str) -> None:
return None
def change_user_password(self, username: str, password_hash: str) -> None:
return None
def list_users(self):
return []
def get_user(self, username: str):
raise ApiError(404, "not_found", "not found")
def create_group(self, groupname: str) -> None:
return None
def delete_group(self, groupname: str) -> None:
return None
def list_groups(self):
return []
def get_group(self, groupname: str):
raise ApiError(404, "not_found", "not found")
def add_user_groups(self, username: str, groups: List[str], replace: bool) -> None:
return None
def remove_user_groups(self, username: str, groups: List[str]) -> None:
return None
def get_user_groups(self, username: str):
return []
def read_user_environment(self, username: str) -> str:
return ""
def write_default_user_environment(self, username: str, content: str) -> None:
self.default_environment_content = content
def write_managed_user_environment(self, username: str, content: str) -> None:
return None
def test_create_user_uses_configured_shell_and_home_dir() -> None:
provider = NoopProvider()
service = UserGroupService(provider=provider, home_base_dir="/srv/home", default_shell="/usr/sbin/nologin")
payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[])
service.create_user(payload)
assert provider.created_home_dir == "/srv/home/alice"
assert provider.created_shell == "/usr/sbin/nologin"
def test_link_home_dir_uses_home_base_symlink_path_and_external_storage() -> None:
provider = NoopProvider()
service = UserGroupService(provider=provider, home_base_dir="/home", link_home_dir="/data/home")
payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[])
service.create_user(payload)
assert provider.created_home_dir == "/home/alice"
assert provider.created_linked_home_dir == "/data/home/alice"
def test_create_user_writes_default_environment_without_managed_block() -> None:
provider = NoopProvider()
service = UserGroupService(provider=provider, home_base_dir="/srv/home")
payload = UserCreateRequest(username="alice", password_hash="$6$rounds=5000$abcdefghij", groups=[], default_environment_variables="export A=1\n")
service.create_user(payload)
assert provider.default_environment_content == "export A=1\n"
def test_command_error_mapping_conflict() -> None:
error = map_command_error("user already exists", 9)
assert error.status_code == 409
class RecordingExecutor:
def __init__(self, home_dir: str) -> None:
self.home_dir = home_dir
self.commands: List[List[str]] = []
self.files: dict[str, str] = {}
def run(self, args: List[str], use_sudo: bool = True, strip_output: bool = True, allowed_exit_codes=None) -> str:
self.commands.append(args)
if args == ["getent", "passwd", "alice"]:
return f"alice:x:1000:1000::%s:/bin/bash" % self.home_dir
if args == ["cat", f"{self.home_dir}/.bashrc"]:
if "bashrc" not in self.files:
raise ApiError(404, "not_found", "No such file or directory")
return self.files["bashrc"]
return ""
class MissingUserExecutor:
def run(self, args: List[str], use_sudo: bool = True, strip_output: bool = True, allowed_exit_codes=None) -> str:
if args == ["getent", "passwd", "missing"] and allowed_exit_codes == {2}:
return ""
raise AssertionError(f"unexpected command: {args}")
class FakePath:
def __init__(self, path: str) -> None:
self.path = path
def is_symlink(self) -> bool:
return self.path == "/home/alice"
def test_delete_user_unlinks_home_when_it_is_symlink(monkeypatch) -> None:
monkeypatch.setattr(cli_provider, "Path", FakePath)
executor = RecordingExecutor("/home/alice")
provider = CliSystemProvider(executor=executor)
provider.delete_user("alice")
assert ["userdel", "alice"] in executor.commands
assert ["unlink", "/home/alice"] in executor.commands
def test_get_user_treats_getent_exit_code_two_as_not_found() -> None:
provider = CliSystemProvider(executor=MissingUserExecutor())
try:
provider.get_user("missing")
except ApiError as exception:
assert exception.status_code == 404
assert exception.code == "not_found"
else:
raise AssertionError("expected ApiError")
def test_delete_user_does_not_unlink_home_when_it_is_directory(monkeypatch) -> None:
monkeypatch.setattr(cli_provider, "Path", FakePath)
executor = RecordingExecutor("/home/old-alice")
provider = CliSystemProvider(executor=executor)
provider.delete_user("alice")
assert ["userdel", "alice"] in executor.commands
assert ["unlink", "/home/old-alice"] not in executor.commands
def test_managed_environment_block_replaces_existing_block() -> None:
provider = CliSystemProvider(executor=RecordingExecutor("/home/alice"))
current = "before\n\n# >>> BastionSSO environment >>>\nold=1\n# <<< BastionSSO environment <<<\n\nafter\n"
result = provider._replace_managed_block(current, "new=2")
assert "before" in result
assert "after" in result
assert "old=1" not in result
assert "new=2" in result
assert "# >>> BastionSSO environment >>>" in result
assert "# <<< BastionSSO environment <<<" in result
def test_read_user_environment_returns_empty_when_bashrc_missing() -> None:
provider = CliSystemProvider(executor=RecordingExecutor("/home/alice"))
assert provider.read_user_environment("alice") == ""
def test_write_user_environment_uses_install_for_bashrc_only() -> None:
executor = RecordingExecutor("/home/alice")
provider = CliSystemProvider(executor=executor)
provider.write_default_user_environment("alice", "export A=1\n")
assert ["mkdir", "-p", "/home/alice"] in executor.commands
install_command = next(command for command in executor.commands if command[:7] == ["install", "-m", "644", "-o", "alice", "-g", "1000"])
assert install_command[-1] == "/home/alice/.bashrc"
assert not any(command[:3] == ["chown", "-R", "alice"] for command in executor.commands)
def test_all_user_environments_collects_partial_failures() -> None:
class PartialFailureProvider(NoopProvider):
def list_users(self):
from app.core.models import UserSummary
return [
UserSummary(username="alice", uid=1000, gid=1000, home_dir="/home/alice", shell="/bin/bash"),
UserSummary(username="bob", uid=1001, gid=1001, home_dir="/home/bob", shell="/bin/bash"),
]
def get_user(self, username: str):
from app.core.models import UserSummary
return UserSummary(username=username, uid=1000, gid=1000, home_dir=f"/home/{username}", shell="/bin/bash")
def write_managed_user_environment(self, username: str, content: str) -> None:
if username == "bob":
raise ApiError(503, "system_permission_denied", "permission denied")
service = UserGroupService(provider=PartialFailureProvider(), home_base_dir="/home")
result = service.set_all_user_environments("export A=1")
assert result.updated_users == ["alice"]
assert result.failed_count == 1
assert result.failed_users[0].username == "bob"
def test_command_error_mapping_includes_exit_code_when_stderr_empty() -> None:
error = map_command_error("", 7)
assert error.message == "system command failed with exit code 7."