187 lines
8.4 KiB
Python
187 lines
8.4 KiB
Python
from pathlib import Path, PurePosixPath
|
|
import subprocess
|
|
import tempfile
|
|
from typing import List, Optional, Set
|
|
|
|
from app.core.errors import ApiError, map_command_error
|
|
from app.core.models import GroupSummary, UserSummary
|
|
from app.providers.base import SystemProvider
|
|
|
|
|
|
class CommandExecutor:
|
|
def __init__(self, sudo_path: str, timeout_seconds: int):
|
|
self.sudo_path = sudo_path
|
|
self.timeout_seconds = timeout_seconds
|
|
self.allowlist = {"useradd", "usermod", "userdel", "groupadd", "groupdel", "chpasswd", "id", "getent", "mkdir", "ln", "chown", "chmod", "unlink", "cat", "install"}
|
|
|
|
def run(self, args: List[str], use_sudo: bool = True, strip_output: bool = True, allowed_exit_codes: Optional[Set[int]] = None) -> str:
|
|
if not args:
|
|
raise ApiError(500, "invalid_command", "Empty command.")
|
|
command = args[0]
|
|
if command not in self.allowlist:
|
|
raise ApiError(500, "forbidden_command", f"Command not allowlisted: {command}")
|
|
full = [command] + args[1:]
|
|
if use_sudo:
|
|
full = [self.sudo_path, "-n"] + full
|
|
try:
|
|
result = subprocess.run(full, capture_output=True, text=True, timeout=self.timeout_seconds, check=False)
|
|
except subprocess.TimeoutExpired as exception:
|
|
raise ApiError(503, "system_timeout", "System command timed out.") from exception
|
|
|
|
if result.returncode != 0 and result.returncode not in (allowed_exit_codes or set()):
|
|
raise map_command_error(result.stderr, result.returncode, command)
|
|
if strip_output:
|
|
return result.stdout.strip()
|
|
|
|
return result.stdout
|
|
|
|
|
|
class CliSystemProvider(SystemProvider):
|
|
managed_start_marker = "# >>> BastionSSO environment >>>"
|
|
managed_end_marker = "# <<< BastionSSO environment <<<"
|
|
bash_profile_content = "if [ -f ~/.bashrc ]; then\n . ~/.bashrc\nfi\n"
|
|
|
|
def __init__(self, executor: CommandExecutor):
|
|
self.executor = executor
|
|
|
|
def create_user(self, username: str, password_hash: str, home_dir: Optional[str], linked_home_dir: Optional[str], shell: str, primary_group: Optional[str], groups: List[str]) -> None:
|
|
command = ["useradd", "-s", shell, "-p", password_hash]
|
|
if linked_home_dir:
|
|
command.append("-M")
|
|
else:
|
|
command.append("-m")
|
|
if home_dir:
|
|
command.extend(["-d", home_dir])
|
|
if primary_group:
|
|
command.extend(["-g", primary_group])
|
|
if groups:
|
|
command.extend(["-G", ",".join(groups)])
|
|
command.append(username)
|
|
self.executor.run(command)
|
|
if linked_home_dir and home_dir:
|
|
self.executor.run(["mkdir", "-p", linked_home_dir])
|
|
self.executor.run(["chown", "-R", username, linked_home_dir])
|
|
self.executor.run(["ln", "-s", linked_home_dir, home_dir])
|
|
self._write_user_bash_profile(username)
|
|
|
|
def delete_user(self, username: str) -> None:
|
|
home_dir = self.get_user(username).home_dir
|
|
self.executor.run(["userdel", username])
|
|
if Path(home_dir).is_symlink():
|
|
self.executor.run(["unlink", home_dir])
|
|
|
|
def change_user_password(self, username: str, password_hash: str) -> None:
|
|
self.executor.run(["usermod", "-p", password_hash, username])
|
|
|
|
def list_users(self) -> List[UserSummary]:
|
|
output = self.executor.run(["getent", "passwd"], use_sudo=False)
|
|
users: List[UserSummary] = []
|
|
for line in output.splitlines():
|
|
parts = line.split(":")
|
|
if len(parts) < 7:
|
|
continue
|
|
username, _, uid, gid, _, home_dir, shell = parts[:7]
|
|
users.append(UserSummary(username=username, uid=int(uid), gid=int(gid), home_dir=home_dir, shell=shell))
|
|
return users
|
|
|
|
def get_user(self, username: str) -> UserSummary:
|
|
output = self.executor.run(["getent", "passwd", username], use_sudo=False, allowed_exit_codes={2})
|
|
if not output:
|
|
raise ApiError(404, "not_found", f"User not found: {username}")
|
|
parts = output.split(":")
|
|
return UserSummary(username=parts[0], uid=int(parts[2]), gid=int(parts[3]), home_dir=parts[5], shell=parts[6])
|
|
|
|
def create_group(self, groupname: str) -> None:
|
|
self.executor.run(["groupadd", groupname])
|
|
|
|
def delete_group(self, groupname: str) -> None:
|
|
self.executor.run(["groupdel", groupname])
|
|
|
|
def list_groups(self) -> List[GroupSummary]:
|
|
output = self.executor.run(["getent", "group"], use_sudo=False)
|
|
groups: List[GroupSummary] = []
|
|
for line in output.splitlines():
|
|
parts = line.split(":")
|
|
if len(parts) < 4:
|
|
continue
|
|
name, _, gid, members = parts[:4]
|
|
member_list = [member for member in members.split(",") if member]
|
|
groups.append(GroupSummary(groupname=name, gid=int(gid), members=member_list))
|
|
return groups
|
|
|
|
def get_group(self, groupname: str) -> GroupSummary:
|
|
output = self.executor.run(["getent", "group", groupname], use_sudo=False, allowed_exit_codes={2})
|
|
if not output:
|
|
raise ApiError(404, "not_found", f"Group not found: {groupname}")
|
|
parts = output.split(":")
|
|
members = [member for member in parts[3].split(",") if member]
|
|
return GroupSummary(groupname=parts[0], gid=int(parts[2]), members=members)
|
|
|
|
def add_user_groups(self, username: str, groups: List[str], replace: bool) -> None:
|
|
mode_flag = "-G" if replace else "-aG"
|
|
self.executor.run(["usermod", mode_flag, ",".join(groups), username])
|
|
|
|
def remove_user_groups(self, username: str, groups: List[str]) -> None:
|
|
current = self.get_user_groups(username)
|
|
remaining = [group for group in current if group not in set(groups)]
|
|
self.executor.run(["usermod", "-G", ",".join(remaining), username])
|
|
|
|
def get_user_groups(self, username: str) -> List[str]:
|
|
output = self.executor.run(["id", "-nG", username], use_sudo=False)
|
|
return [group for group in output.split() if group]
|
|
|
|
def read_user_environment(self, username: str) -> str:
|
|
bashrc = PurePosixPath(self.get_user(username).home_dir) / ".bashrc"
|
|
try:
|
|
return self.executor.run(["cat", str(bashrc)], strip_output=False)
|
|
except ApiError as exception:
|
|
if exception.code == "not_found":
|
|
return ""
|
|
|
|
raise
|
|
|
|
def write_default_user_environment(self, username: str, content: str) -> None:
|
|
self._write_user_bashrc(username, content)
|
|
|
|
def write_managed_user_environment(self, username: str, content: str) -> None:
|
|
current = self.read_user_environment(username)
|
|
next_content = self._replace_managed_block(current, content)
|
|
self._write_user_bashrc(username, next_content)
|
|
|
|
def _write_user_bashrc(self, username: str, content: str) -> None:
|
|
self._write_user_file(username, ".bashrc", content)
|
|
|
|
def _write_user_bash_profile(self, username: str) -> None:
|
|
self._write_user_file(username, ".bash_profile", self.bash_profile_content)
|
|
|
|
def _write_user_file(self, username: str, filename: str, content: str) -> None:
|
|
user = self.get_user(username)
|
|
home_dir = PurePosixPath(user.home_dir)
|
|
self.executor.run(["mkdir", "-p", str(home_dir)])
|
|
target = home_dir / filename
|
|
temp_path = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8", delete=False) as temp_file:
|
|
temp_file.write(content)
|
|
temp_path = temp_file.name
|
|
|
|
self.executor.run(["install", "-m", "644", "-o", username, "-g", str(user.gid), temp_path, str(target)])
|
|
finally:
|
|
if temp_path:
|
|
Path(temp_path).unlink(missing_ok=True)
|
|
|
|
def _replace_managed_block(self, current: str, content: str) -> str:
|
|
managed_block = f"{self.managed_start_marker}\n{content.rstrip()}\n{self.managed_end_marker}"
|
|
start_index = current.find(self.managed_start_marker)
|
|
end_index = current.find(self.managed_end_marker)
|
|
if start_index >= 0 and end_index >= start_index:
|
|
end_index += len(self.managed_end_marker)
|
|
next_content = current[:start_index].rstrip() + "\n\n" + managed_block + "\n" + current[end_index:].lstrip()
|
|
|
|
return next_content.strip() + "\n"
|
|
|
|
if current.strip() == "":
|
|
return managed_block + "\n"
|
|
|
|
return current.rstrip() + "\n\n" + managed_block + "\n"
|