Source code for creds.utils

# -*- coding: utf-8 -*-
"""This module contains common helper functions."""

from __future__ import unicode_literals

import base64
import os
import platform
import random
import shlex
import string
import subprocess

from creds.constants import (CMD_SUDO, RANDOM_FILE_EXT_LENGTH, LINUX_CMD_GROUP_ADD, LINUX_CMD_GROUP_DEL,
                             LINUX_CMD_USERADD, LINUX_CMD_USERDEL, LINUX_CMD_USERMOD, FREEBSD_CMD_PW, LINUX_CMD_VISUDO)
from external.six import (PY2, PY3, text_type)


[docs]def sudo_check(): """Return the string 'sudo' if current user isn't root.""" sudo_cmd = '' if os.geteuid() != 0: sudo_cmd = CMD_SUDO return sudo_cmd
[docs]def get_platform(): """Return platform name""" return platform.system()
[docs]def get_missing_commands(_platform): """Check I can identify the necessary commands for managing users.""" missing = list() if _platform in ('Linux', 'OpenBSD'): if not LINUX_CMD_USERADD: missing.append('useradd') if not LINUX_CMD_USERMOD: missing.append('usermod') if not LINUX_CMD_USERDEL: missing.append('userdel') if not LINUX_CMD_GROUP_ADD: missing.append('groupadd') if not LINUX_CMD_GROUP_DEL: missing.append('groupdel') elif _platform == 'FreeBSD': # pragma: FreeBSD # FREEBSD COMMANDS if not FREEBSD_CMD_PW: missing.append('pw') if missing: print('\nMISSING = {0}'.format(missing)) return missing
[docs]def execute_command(command=None): """Execute a command and return the stdout and stderr.""" process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stdin = process.communicate() process.wait() return (stdout, stdin), process.returncode
[docs]def random_string(length=None): """Generate a random string of ASCII characters.""" return ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(length))
[docs]def base64encode(_input=None): """Return base64 encoded representation of a string.""" if PY2: # pragma: no cover return base64.b64encode(_input) elif PY3: # pragma: no cover if isinstance(_input, bytes): return base64.b64encode(_input).decode('UTF-8') elif isinstance(_input, str): return base64.b64encode(bytearray(_input, encoding='UTF-8')).decode('UTF-8')
[docs]def base64decode(_input=None): """Take a base64 encoded string and return the decoded string.""" missing_padding = 4 - len(_input) % 4 if missing_padding: _input += '=' * missing_padding if PY2: # pragma: no cover return base64.decodestring(_input) elif PY3: # pragma: no cover if isinstance(_input, bytes): return base64.b64decode(_input).decode('UTF-8') elif isinstance(_input, str): return base64.b64decode(bytearray(_input, encoding='UTF-8')).decode('UTF-8')
[docs]def read_sudoers(): """ Read the sudoers entry for the specified user. args: username (str): username. returns:`r str: sudoers entry for the specified user. """ sudoers_path = '/etc/sudoers' rnd_chars = random_string(length=RANDOM_FILE_EXT_LENGTH) tmp_sudoers_path = '/tmp/sudoers_{0}'.format(rnd_chars) sudoers_entries = list() copy_result = execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), sudoers_path, tmp_sudoers_path)))) result_message = copy_result[0][1].decode('UTF-8') if 'No such file or directory' not in result_message: execute_command(shlex.split(str('{0} chmod 755 {1}'.format(sudo_check(), tmp_sudoers_path)))) with open(tmp_sudoers_path) as tmp_sudoers_file: for line in tmp_sudoers_file: stripped = line.strip().replace(os.linesep, '') if stripped and not stripped.startswith('#'): sudoers_entries.append(stripped) execute_command(shlex.split(str('{0} rm {1}'.format(sudo_check(), tmp_sudoers_path)))) return sudoers_entries
[docs]def write_sudoers_entry(username=None, sudoers_entry=None): """Write sudoers entry. args: user (User): Instance of User containing sudoers entry. returns: str: sudoers entry for the specified user. """ sudoers_path = '/etc/sudoers' rnd_chars = random_string(length=RANDOM_FILE_EXT_LENGTH) tmp_sudoers_path = '/tmp/sudoers_{0}'.format(rnd_chars) execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), sudoers_path, tmp_sudoers_path)))) execute_command( shlex.split(str('{0} chmod 777 {1}'.format(sudo_check(), tmp_sudoers_path)))) with open(tmp_sudoers_path, mode=text_type('r')) as tmp_sudoers_file: sudoers_entries = tmp_sudoers_file.readlines() sudoers_output = list() for entry in sudoers_entries: if entry and not entry.startswith(username): sudoers_output.append(entry) if sudoers_entry: sudoers_output.append('{0} {1}'.format(username, sudoers_entry)) sudoers_output.append('\n') with open(tmp_sudoers_path, mode=text_type('w+')) as tmp_sudoers_file: tmp_sudoers_file.writelines(sudoers_output) sudoers_check_result = execute_command( shlex.split(str('{0} {1} -cf {2}'.format(sudo_check(), LINUX_CMD_VISUDO, tmp_sudoers_path)))) if sudoers_check_result[1] > 0: raise ValueError(sudoers_check_result[0][1]) execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), tmp_sudoers_path, sudoers_path)))) execute_command(shlex.split(str('{0} chown root:root {1}'.format(sudo_check(), sudoers_path)))) execute_command(shlex.split(str('{0} chmod 440 {1}'.format(sudo_check(), sudoers_path)))) execute_command(shlex.split(str('{0} rm {1}'.format(sudo_check(), tmp_sudoers_path))))
[docs]def remove_sudoers_entry(username=None): """Remove sudoers entry. args: user (User): Instance of User containing sudoers entry. returns: str: sudoers entry for the specified user. """ sudoers_path = '/etc/sudoers' rnd_chars = random_string(length=RANDOM_FILE_EXT_LENGTH) tmp_sudoers_path = '/tmp/sudoers_{0}'.format(rnd_chars) execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), sudoers_path, tmp_sudoers_path)))) execute_command( shlex.split(str('{0} chmod 777 {1}'.format(sudo_check(), tmp_sudoers_path)))) with open(tmp_sudoers_path, mode=text_type('r')) as tmp_sudoers_file: sudoers_entries = tmp_sudoers_file.readlines() sudoers_output = list() for entry in sudoers_entries: if not entry.startswith(username): sudoers_output.append(entry) with open(tmp_sudoers_path, mode=text_type('w+')) as tmp_sudoers_file: tmp_sudoers_file.writelines(sudoers_output) execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), tmp_sudoers_path, sudoers_path)))) execute_command(shlex.split(str('{0} chown root:root {1}'.format(sudo_check(), sudoers_path)))) execute_command(shlex.split(str('{0} chmod 440 {1}'.format(sudo_check(), sudoers_path)))) execute_command(shlex.split(str('{0} rm {1}'.format(sudo_check(), tmp_sudoers_path))))
[docs]def get_sudoers_entry(username=None, sudoers_entries=None): """ Find the sudoers entry in the sudoers file for the specified user. args: username (str): username. sudoers_entries (list): list of lines from the sudoers file. returns:`r str: sudoers entry for the specified user. """ for entry in sudoers_entries: if entry.startswith(username): return entry.replace(username, '').strip()