Source code for creds.ssh

# -*- coding: utf-8 -*-
"""A class to represent a users' keys and functions to manage them."""
from __future__ import (unicode_literals, print_function)

import os
import shlex

from creds.constants import RANDOM_FILE_EXT_LENGTH
from creds.utils import base64decode, base64encode
from creds.utils import execute_command, random_string, sudo_check
from external.six import text_type


[docs]class PublicKey(object): """Representation of a public key.""" def __init__(self, raw=None, b64encoded=None): """Make a public key. args: raw (str): raw public key b64encoded (str): base64 encoded public key """ if not any((raw, b64encoded)): raise AttributeError('Key not provided') self._raw = raw self._b64encoded = b64encoded @property def b64encoded(self): """Return a base64 encoding of the key. returns: str: base64 encoding of the public key """ if self._b64encoded: return text_type(self._b64encoded).strip("\r\n") else: return base64encode(self.raw) @property def raw(self): """Return raw key. returns: str: raw key """ if self._raw: return text_type(self._raw).strip("\r\n") else: return text_type(base64decode(self._b64encoded)).strip("\r\n") def __str__(self): return self.__repr__() def __repr__(self): return 'PublicKey(raw=\"{0}\", b64encoded=\"{1}\"'.format(self.raw, self.b64encoded)
# TODO: Keep temporary copy so we can check for race condition.
[docs]def read_authorized_keys(username=None): """Read public keys from specified user's authorized_keys file. args: username (str): username. returns: list: Authorised keys for the specified user. """ authorized_keys_path = '{0}/.ssh/authorized_keys'.format(os.path.expanduser('~{0}'.format(username))) rnd_chars = random_string(length=RANDOM_FILE_EXT_LENGTH) tmp_authorized_keys_path = '/tmp/authorized_keys_{0}_{1}'.format(username, rnd_chars) authorized_keys = list() copy_result = execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), authorized_keys_path, tmp_authorized_keys_path)))) result_message = copy_result[0][1].decode('UTF-8') if 'you must have a tty to run sudo' in result_message: # pragma: no cover raise OSError("/etc/sudoers is blocked sudo. Remove entry: 'Defaults requiretty'.") elif 'No such file or directory' not in result_message: execute_command(shlex.split(str('{0} chmod 755 {1}'.format(sudo_check(), tmp_authorized_keys_path)))) with open(tmp_authorized_keys_path) as keys_file: for key in keys_file: authorized_keys.append(PublicKey(raw=key)) execute_command(shlex.split(str('{0} rm {1}'.format(sudo_check(), tmp_authorized_keys_path)))) return authorized_keys
[docs]def write_authorized_keys(user=None): """Write public keys back to authorized_keys file. Create keys directory if it doesn't already exist. args: user (User): Instance of User containing keys. returns: list: Authorised keys for the specified user. """ authorized_keys = list() authorized_keys_dir = '{0}/.ssh'.format(os.path.expanduser('~{0}'.format(user.name))) rnd_chars = random_string(length=RANDOM_FILE_EXT_LENGTH) authorized_keys_path = '{0}/authorized_keys'.format(authorized_keys_dir) tmp_authorized_keys_path = '/tmp/authorized_keys_{0}_{1}'.format(user.name, rnd_chars) if not os.path.isdir(authorized_keys_dir): execute_command(shlex.split(str('{0} mkdir -p {1}'.format(sudo_check(), authorized_keys_dir)))) for key in user.public_keys: authorized_keys.append('{0}\n'.format(key.raw)) with open(tmp_authorized_keys_path, mode=text_type('w+')) as keys_file: keys_file.writelines(authorized_keys) execute_command( shlex.split(str('{0} cp {1} {2}'.format(sudo_check(), tmp_authorized_keys_path, authorized_keys_path)))) execute_command(shlex.split(str('{0} chown -R {1} {2}'.format(sudo_check(), user.name, authorized_keys_dir)))) execute_command(shlex.split(str('{0} chmod 700 {1}'.format(sudo_check(), authorized_keys_dir)))) execute_command(shlex.split(str('{0} chmod 600 {1}'.format(sudo_check(), authorized_keys_path)))) execute_command(shlex.split(str('{0} rm {1}'.format(sudo_check(), tmp_authorized_keys_path))))