You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
394 lines
13 KiB
394 lines
13 KiB
from log import logger |
|
import time |
|
import ldap |
|
import ldap.modlist as modlist |
|
import secrets |
|
import base64 |
|
|
|
from flask import abort |
|
|
|
HTTP_NOTFOUND = 404 |
|
|
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) |
|
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, secrets.LDAP_CERTFILE) |
|
|
|
def init_ldap(): |
|
ldap_conn = ldap.initialize(secrets.LDAP_URL) |
|
ldap_conn.set_option(ldap.OPT_REFERRALS, 0) |
|
ldap_conn.set_option(ldap.OPT_PROTOCOL_VERSION, 3) |
|
ldap_conn.set_option(ldap.OPT_X_TLS,ldap.OPT_X_TLS_DEMAND) |
|
ldap_conn.set_option(ldap.OPT_X_TLS_DEMAND, True) |
|
ldap_conn.set_option(ldap.OPT_DEBUG_LEVEL, 255) |
|
|
|
return ldap_conn |
|
|
|
def convert(data): |
|
if isinstance(data, dict): |
|
return {convert(key): convert(value) for key, value in data.items()} |
|
elif isinstance(data, (list, tuple)): |
|
if len(data) == 1: |
|
return convert(data[0]) |
|
else: |
|
return [convert(element) for element in data] |
|
elif isinstance(data, (bytes, bytearray)): |
|
try: |
|
return data.decode() |
|
except UnicodeDecodeError: |
|
return data.hex() |
|
else: |
|
return data |
|
|
|
def find_user(query): |
|
''' |
|
Search for a user by sAMAccountname or email |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Looking up user ' + query) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
criteria = '(&(objectClass=user)(|(mail={})(sAMAccountName={})(userPrincipalName={}*))(!(objectClass=computer)))'.format(query, query, query) |
|
results = ldap_conn.search_s(secrets.BASE_MEMBERS, ldap.SCOPE_SUBTREE, criteria, ['displayName','sAMAccountName','email']) |
|
|
|
logger.info(' Results: ' + str(results)) |
|
|
|
if len(results) != 1: |
|
abort(HTTP_NOTFOUND) |
|
|
|
return results[0][0] |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def find_dn(dn): |
|
''' |
|
Search for a user by dn |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Finding user for dn: ' + dn) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
criteria = '(&(objectClass=user)(!(objectClass=computer)))' |
|
results = ldap_conn.search_s(dn, ldap.SCOPE_SUBTREE, criteria, ['sAMAccountName']) |
|
|
|
logger.info(' Results: ' + str(results)) |
|
|
|
return results[0][1]['sAMAccountName'][0].decode() |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def create_user(first, last, username, email, password): |
|
''' |
|
Create a User; required data is first, last, email, username, password |
|
Note: this creates a disabled user, then sets a password, then enables the user |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Creating user: ' + username) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
dn = 'CN={} {},{}'.format(first, last, secrets.BASE_MEMBERS) |
|
full_name = '{} {}'.format(first, last) |
|
|
|
ldif = [ |
|
('objectClass', [b'top', b'person', b'organizationalPerson', b'user']), |
|
('cn', [full_name.encode()]), |
|
('userPrincipalName', [username.encode()]), |
|
('sAMAccountName', [username.encode()[:20]]), |
|
('givenName', [first.encode()]), |
|
('sn', [last.encode()]), |
|
('DisplayName', [full_name.encode()]), |
|
('userAccountControl', [b'514']), |
|
('mail', [email.encode()]), |
|
('company', [b'Spaceport']), |
|
] |
|
|
|
result = ldap_conn.add_s(dn, ldif) |
|
|
|
logger.info(' Result: ' + str(result)) |
|
|
|
# set password |
|
pass_quotes = '"{}"'.format(password) |
|
pass_uni = pass_quotes.encode('utf-16-le') |
|
change_des = [(ldap.MOD_REPLACE, 'unicodePwd', [pass_uni])] |
|
result = ldap_conn.modify_s(dn, change_des) |
|
|
|
logger.info(' Result: ' + str(result)) |
|
|
|
# 512 will set user account to enabled |
|
mod_acct = [(ldap.MOD_REPLACE, 'userAccountControl', b'512')] |
|
result = ldap_conn.modify_s(dn, mod_acct) |
|
|
|
logger.info(' Result: ' + str(result)) |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def set_password(username, password): |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Setting password for: ' + username) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
user_dn = find_user(username) |
|
|
|
logger.info(' Dn found: ' + user_dn) |
|
|
|
# set password |
|
pass_quotes = '"{}"'.format(password) |
|
pass_uni = pass_quotes.encode('utf-16-le') |
|
change_des = [(ldap.MOD_REPLACE, 'unicodePwd', [pass_uni])] |
|
result = ldap_conn.modify_s(user_dn, change_des) |
|
|
|
logger.info(' Set password result: ' + str(result)) |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def find_group(groupname): |
|
''' |
|
Search for a group by name or sAMAccountname |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Looking up group ' + groupname) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
criteria = '(&(objectClass=group)(sAMAccountName={}))'.format(groupname) |
|
results = ldap_conn.search_s(secrets.BASE_GROUPS, ldap.SCOPE_SUBTREE, criteria, ['name','groupType'] ) |
|
|
|
logger.info(' Results: ' + str(results)) |
|
|
|
if len(results) != 1: |
|
abort(HTTP_NOTFOUND) |
|
|
|
return results[0][0] |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def create_group(groupname, description): |
|
''' |
|
Create a Group; required data is sAMAccountName, Description |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
dn = 'CN={},{}'.format(groupname, secrets.BASE_GROUPS) |
|
|
|
ldif = [ |
|
('objectClass', [b'top', b'group']), |
|
('cn', [groupname.encode()]), |
|
('DisplayName', [groupname.encode()]), |
|
('description', [description.encode()]), |
|
('sAMAccountName', [groupname.encode()]) |
|
] |
|
|
|
rcode = ldap_conn.add_s(dn, ldif) |
|
return rcode |
|
|
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def add_to_group(groupname, username): |
|
''' |
|
Add a user to a Group; required data is GroupName, Username |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Adding ' + username + ' to group: ' + groupname) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
user_dn = find_user(username) |
|
group_dn = find_group(groupname) |
|
|
|
if not is_member(groupname, username): |
|
mod_acct = [(ldap.MOD_ADD, 'member', user_dn.encode())] |
|
ldap_conn.modify_s(group_dn, mod_acct) |
|
logger.info(' Added.') |
|
return True |
|
else: |
|
logger.info(' Already a member, skipping.') |
|
return False |
|
|
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def remove_from_group(groupname, username): |
|
''' |
|
Remove a user from a Group; required data is GroupName, Username |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
user_dn = find_user(username) |
|
group_dn = find_group(groupname) |
|
|
|
if is_member(groupname, username): |
|
mod_acct = [(ldap.MOD_DELETE, 'member', user_dn.encode())] |
|
ldap_conn.modify_s(group_dn, mod_acct) |
|
return True |
|
else: |
|
logger.info('Not a member, skipping') |
|
return False |
|
|
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def list_group(groupname): |
|
''' |
|
List users in a Group; required data is GroupName |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
group_dn = find_group(groupname) |
|
|
|
criteria = '(&(objectClass=group)(sAMAccountName={}))'.format(groupname) |
|
results = ldap_conn.search_s(secrets.BASE_GROUPS, ldap.SCOPE_SUBTREE, criteria, ['member']) |
|
members_tmp = results[0][1] |
|
members = members_tmp.get('member', []) |
|
return [find_dn(dn.decode()) for dn in members] |
|
|
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def delete_user(username): |
|
''' |
|
Delete user; required data is sAMAccountName or userPrincipleName |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Deleting user: ' + username) |
|
|
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
user_dn = find_user(username) |
|
result = ldap_conn.delete_s(user_dn) |
|
|
|
logger.info(' Result: ' + str(result)) |
|
return result |
|
|
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def is_member(groupname, username): |
|
''' |
|
Checks to see if a user is a member of a group |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Checking if ' + username + ' is in group: ' + groupname) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
group_dn = find_group(groupname) |
|
user_dn = find_user(username).encode() |
|
memflag = False |
|
criteria = '(&(objectClass=group)(sAMAccountName={}))'.format(groupname) |
|
results = ldap_conn.search_s(secrets.BASE_GROUPS, ldap.SCOPE_SUBTREE, criteria, ['member'] ) |
|
members_tmp = results[0][1] |
|
members = members_tmp.get('member', []) |
|
result = user_dn in members |
|
|
|
logger.info(' Result: ' + str(result)) |
|
return result |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def dump_users(): |
|
''' |
|
Dump all AD users |
|
''' |
|
ldap_conn = init_ldap() |
|
try: |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
criteria = '(&(objectClass=user)(objectGUID=*))' |
|
attributes = ['cn', 'sAMAccountName', 'userPrincipalName', 'mail', 'displayName', 'givenName', 'name', 'sn', 'logonCount', 'objectGUID'] |
|
results = ldap_conn.search_s(secrets.BASE_MEMBERS, ldap.SCOPE_SUBTREE, criteria, attributes) |
|
results = convert(results) |
|
|
|
output = {} |
|
for r in results: |
|
tmp = r[1] |
|
tmp['dn'] = r[0] |
|
output[r[1]['sAMAccountName']] = tmp |
|
|
|
import json |
|
return json.dumps(output, indent=4) |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
def set_account_enabled(username, is_enabled): |
|
ldap_conn = init_ldap() |
|
try: |
|
logger.info('Setting account enabled for: ' + username) |
|
ldap_conn.simple_bind_s(secrets.LDAP_USERNAME, secrets.LDAP_PASSWORD) |
|
criteria = '(&(objectClass=user)(sAMAccountName={})(!(objectClass=computer)))'.format(username) |
|
results = ldap_conn.search_s( |
|
secrets.BASE_MEMBERS, |
|
ldap.SCOPE_SUBTREE, |
|
criteria, [ |
|
'displayName', |
|
'sAMAccountName', |
|
'email', |
|
'userAccountControl', |
|
], |
|
) |
|
|
|
if len(results) != 1: |
|
abort(HTTP_NOTFOUND) |
|
|
|
try: |
|
dn = results[0][0] |
|
prev_control = results[0][1]['userAccountControl'][0] |
|
except KeyError: |
|
abort(HTTP_NOTFOUND) |
|
|
|
prev_control = int(prev_control.decode()) |
|
|
|
if is_enabled: |
|
logger.info('Enabling account') |
|
new_control = prev_control & ~0x2 |
|
else: |
|
logger.info('Disabling account') |
|
new_control = prev_control | 0x2 |
|
|
|
logger.info(' Dn found: %s', dn) |
|
logger.info(' Current control: %s', prev_control) |
|
logger.info(' New control: %s', new_control) |
|
|
|
new_control = str(new_control).encode() |
|
|
|
mod_acct = [(ldap.MOD_REPLACE, 'userAccountControl', new_control)] |
|
result = ldap_conn.modify_s(dn, mod_acct) |
|
logger.info(' Result: ' + str(result)) |
|
return result |
|
finally: |
|
ldap_conn.unbind() |
|
|
|
|
|
# =========================================================================== |
|
|
|
#guid = '\\b4\\51\\1adce6709c449bd21a812c423e82' |
|
#guid = ''.join(['\\%s' % guid[i:i+2] for i in range(0, len(guid), 2)]) |
|
#print(guid) |
|
#criteria = '(&(objectClass=user)(objectGUID={}))'.format(guid) |
|
|
|
if __name__ == '__main__': |
|
pass |
|
print("=-=-=-=-=-=-=-=-=-=") |
|
#print(create_user('test', 'test', 'test.test', 'test@example.com', 'protospace*&^g87g6')) |
|
#print("----------") |
|
#print(find_user('elon.tusk')) |
|
#print("----------") |
|
#print(delete_user('elon.tusk')) |
|
#print("============================================================") |
|
#print(create_group("newgroup", "new group")) |
|
#print(" ============== ") |
|
#print(list_group("Laser Users")) |
|
#print(" ============== ") |
|
#print(is_member('newgroup','tanner.collin')) |
|
#print(" ============== ") |
|
#print(add_to_group('newgroup','tanner.collin')) |
|
#print(" ============== ") |
|
#print(list_group("newgroup")) |
|
#print(" ============== ") |
|
#print(remove_from_group('newgroup','tanner.collin')) |
|
#print(" ============== ") |
|
#print(list_group('Trotec Users')) |
|
#print(dump_users()) |
|
|
|
#print(set_account_enabled('tanner.collin', True)) |
|
|
|
#users = list_group('Laser Users') |
|
#import json |
|
#print(json.dumps(users, indent=4))
|
|
|