Source code for creds.plan
# -*- coding: utf-8 -*-
"""Functions to generate a list of steps to transition from the current state to the desired state."""
from __future__ import (unicode_literals, print_function)
from creds import constants
from creds.ssh import write_authorized_keys
from creds.users import (generate_add_user_command, generate_modify_user_command,
generate_delete_user_command, compare_user, get_user_by_uid)
from creds.utils import execute_command, write_sudoers_entry, remove_sudoers_entry
from external.six import iteritems
[docs]def create_plan(existing_users=None, proposed_users=None, purge_undefined=None, protected_users=None,
allow_non_unique_id=None, manage_home=True, manage_keys=True):
"""Determine what changes are required.
args:
existing_users (Users): List of discovered users
proposed_users (Users): List of proposed users
purge_undefined (bool): Remove discovered users that have not been defined in proposed users list
protected_users (list): List of users' names that should not be evaluated as part of the plan creation process
allow_non_unique_id (bool): Allow more than one user to have the same uid
manage_home (bool): Create/remove users' home directories
manage_keys (bool): Add/update/remove users' keys (manage_home must also be true)
returns:
list: Differences between discovered and proposed users with a
list of operations that will achieve the desired state.
"""
plan = list()
proposed_usernames = list()
if not purge_undefined:
purge_undefined = constants.PURGE_UNDEFINED
if not protected_users:
protected_users = constants.PROTECTED_USERS
if not allow_non_unique_id:
allow_non_unique_id = constants.ALLOW_NON_UNIQUE_ID
# Create list of modifications to make based on proposed users compared to existing users
for proposed_user in proposed_users:
proposed_usernames.append(proposed_user.name)
user_matching_name = existing_users.describe_users(users_filter=dict(name=proposed_user.name))
user_matching_id = get_user_by_uid(uid=proposed_user.uid, users=existing_users)
# If user does not exist
if not allow_non_unique_id and user_matching_id and not user_matching_name:
plan.append(
dict(action='fail', error='uid_clash', proposed_user=proposed_user, state='existing', result=None))
elif not user_matching_name:
plan.append(
dict(action='add', proposed_user=proposed_user, state='missing', result=None, manage_home=manage_home,
manage_keys=manage_keys))
# If they do, then compare
else:
user_comparison = compare_user(passed_user=proposed_user, user_list=existing_users)
if user_comparison.get('result'):
plan.append(
dict(action='update', proposed_user=proposed_user, state='existing',
user_comparison=user_comparison, manage_home=manage_home, manage_keys=manage_keys))
# Application of the proposed user list will not result in deletion of users that need to be removed
# If 'PURGE_UNDEFINED' then look for existing users that are not defined in proposed usernames and mark for removal
if purge_undefined:
for existing_user in existing_users:
if existing_user.name not in proposed_usernames:
if existing_user.name not in protected_users:
plan.append(
dict(action='delete', username=existing_user.name, state='existing', manage_home=manage_home,
manage_keys=manage_keys))
return plan
[docs]def execute_plan(plan=None):
"""Create, Modify or Delete, depending on plan item."""
execution_result = list()
for task in plan:
action = task['action']
if action == 'delete':
command = generate_delete_user_command(username=task.get('username'), manage_home=task['manage_home'])
command_output = execute_command(command)
execution_result.append(dict(task=task, command_output=command_output))
remove_sudoers_entry(username=task.get('username'))
elif action == 'add':
command = generate_add_user_command(proposed_user=task.get('proposed_user'), manage_home=task['manage_home'])
command_output = execute_command(command)
if task['proposed_user'].public_keys and task['manage_home'] and task['manage_keys']:
write_authorized_keys(task['proposed_user'])
if task['proposed_user'].sudoers_entry:
write_sudoers_entry(username=task['proposed_user'].name,
sudoers_entry=task['proposed_user'].sudoers_entry)
execution_result.append(dict(task=task, command_output=command_output))
elif action == 'update':
result = task['user_comparison'].get('result')
# Don't modify user if only keys have changed
action_count = 0
for k, _ in iteritems(result):
if '_action' in k:
action_count += 1
command_output = None
if task['manage_home'] and task['manage_keys'] and action_count == 1 and 'public_keys_action' in result:
write_authorized_keys(task['proposed_user'])
elif action_count == 1 and 'sudoers_entry_action' in result:
write_sudoers_entry(username=task['proposed_user'].name,
sudoers_entry=task['user_comparison']['result']['replacement_sudoers_entry'])
else:
command = generate_modify_user_command(task=task)
command_output = execute_command(command)
if task['manage_home'] and task['manage_keys'] and result.get('public_keys_action'):
write_authorized_keys(task['proposed_user'])
if result.get('sudoers_entry_action'):
write_sudoers_entry(username=task['proposed_user'].name,
sudoers_entry=task['user_comparison']['result']['replacement_sudoers_entry'])
execution_result.append(dict(task=task, command_output=command_output))