server-configs/machines/gerd/services/lldap/bootstrap/main.py
2024-12-22 21:57:54 +01:00

287 lines
10 KiB
Python

#!/usr/bin/env nix-shell
#!nix-shell --pure --keep LLDAP_TOKEN -i python3 -p "python3.withPackages (ps: with ps; [ requests gql aiohttp])"
from typing import Any
import subprocess
import secrets
import json
import os
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from pprint import pprint
from .users import LLDAPUsers
from .groups import LLDAPGroups
from .attributes import LLDAPAttributes
import logging
logging.basicConfig()
logging.getLogger("lldapbootstrap").setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class LLDAP:
def __init__(self, server_url: str, auth_token: str):
self._server_url: str = server_url
self._server_auth_token: str = auth_token
self._client: gql.Client = self._init_gql_client()
self._users = LLDAPUsers(self._client)
self._groups = LLDAPGroups(self._client)
self._attrsUser = LLDAPAttributes(self._client, using_user_attributes=True)
self._attrsGroup = LLDAPAttributes(self._client, using_group_attributes=True)
def _init_gql_client(self) -> gql.Client:
# Select your transport with a defined url endpoint
transport = AIOHTTPTransport(
url=f"{self._server_url}/api/graphql",
headers={"Authorization": f"Bearer {self._server_auth_token}"},
)
# Create a GraphQL client using the defined transport
client = gql.Client(transport=transport, fetch_schema_from_transport=True)
# force fetch schema
query = gql.gql(
"""
query {
users {
displayName
}
}
"""
)
result = client.execute(query)
return client
def _run_ensure_attrs_user(self, attrsClass, neededAttrsUser: list[dict[str, Any]]):
dictNeededAttrUser = {v["name"]: v for v in neededAttrsUser}
remoteAttrs = attrsClass.list_all()
# add needed attributes
for neededAttr in neededAttrsUser:
neededAttrName = neededAttr["name"]
if neededAttrName in remoteAttrs:
cattr = remoteAttrs[neededAttrName]
if (
neededAttr["attributeType"] != cattr.attributeType
or neededAttr["isEditable"] != cattr.isEditable
or neededAttr["isList"] != cattr.isList
or neededAttr["isVisible"] != cattr.isVisible
):
logger.debug(
f"attribute '{neededAttrName}' out of sync, deleting and adding again"
)
attrsClass.delete(neededAttrName)
else:
continue
attrsClass.create(
neededAttrName,
attributeType=neededAttr["attributeType"],
isEditable=neededAttr["isEditable"],
isList=neededAttr["isList"],
isVisible=neededAttr["isVisible"],
)
# remove unneeded attributes
for remoteAttrName, remoteAttr in remoteAttrs.items():
# skip hardcoded ones
if remoteAttr.isHardcoded:
continue
if remoteAttrName not in dictNeededAttrUser:
attrsClass.delete(remoteAttrName)
def _run_ensure_groups(self, neededGroups: list[dict[str, Any]]):
tmpNeededGroups = {v["display_name"]: v for v in neededGroups}
remoteGroups = self._groups.list_all()
for neededGroup in neededGroups:
neededGroupDisplay_Name = neededGroup["display_name"]
if neededGroupDisplay_Name not in remoteGroups:
self._groups.create(neededGroupDisplay_Name)
# refresh groups
remoteGroups = self._groups.list_all()
remoteGroup = remoteGroups[neededGroupDisplay_Name]
# we cannot update the display name, and we never would anyways
del neededGroup["display_name"]
self._groups.update(remoteGroup.groupId, neededGroup)
# delete unused groups
for remoteGroupName, remoteGroup in remoteGroups.items():
# skip all lldap_ groups
if remoteGroupName.startswith("lldap_"):
continue
if remoteGroupName not in tmpNeededGroups:
self._groups.delete(remoteGroup.groupId)
def _run_ensure_users(
self,
neededUsers: list[dict[str, Any]],
softDelete: bool = True,
):
tmpNeededUsers = {v["user_id"]: v for v in neededUsers}
remoteUsers = self._users.list_all()
for neededUser in neededUsers:
# get required info from dict, and DELETE from dict
# while we're at it. This means that we can safely use
# `neededUser` for updating later
neededUserId = neededUser.pop("user_id")
neededUserGroups = neededUser.pop("groups", [])
neededUserPassword: str | None = neededUser.pop("password", None)
# create user if needed
if neededUserId not in remoteUsers:
self._users.create(
neededUserId,
neededUser.get("mail", "no-email-specified"),
)
# refresh users
remoteUsers = self._users.list_all()
# update user
self._users.update(neededUserId, neededUser)
# set correct groups
remoteUser = remoteUsers[neededUserId]
# print warning about groups attribute
if neededUserGroups:
logger.info(
f"using attribute 'groups' for userId '{neededUserId}', for setting groups, NOT SETTING AS ATTRIBUTE!!"
)
# add to correct groups
for groupName in neededUserGroups:
if groupName not in remoteUser.groups:
self._users.add_group(
neededUserId,
self._groups.name_to_id(groupName),
)
# remove from unused groups
for groupName, group in remoteUser.groups.items():
if groupName not in neededUserGroups:
self._users.remove_group(
neededUserId,
self._groups.name_to_id(groupName),
)
if neededUserPassword:
logger.info(
f"using attribute 'password' for userId '{neededUserId}', for setting password, NOT SETTING AS ATTRIBUTE!!"
)
if neededUserPassword.startswith("file:"):
passwordFile = neededUserPassword[len("file:") :]
logger.debug(
f"reading password from file from file '{passwordFile}' for user '{neededUserId}'"
)
self._user_set_password(
neededUserId,
open(passwordFile, "r").read().strip(),
)
elif neededUserPassword.startswith("env:"):
cleanedPasswordEnv = neededUserPassword.strip()[len("env:") :]
logger.debug(
f"reading password from envvar '{cleanedPasswordEnv}' for user '{neededUserId}'"
)
password = os.getenv(cleanedPasswordEnv)
if not password:
raise Exception(
f"could not find env '{cleanedPasswordEnv}' for getting password"
)
self._user_set_password(
neededUserId,
password.strip(),
)
else:
logger.debug(
f"using the raw value of password, as the password for user '{neededUserId}'"
)
self._user_set_password(
neededUserId,
neededUserPassword.strip(),
)
# delete unused users
for remoteUserName, remoteUser in remoteUsers.items():
if remoteUserName not in tmpNeededUsers:
if softDelete:
self._user_disable(remoteUserName)
else:
self._users.delete(remoteUser.userId)
def _user_disable(self, userId: str, disabled_group_name: str = "disabled"):
user = self._users.get(userId)
if not user:
return
# remove all groups
for groupName, groupId in user.groups.items():
if groupName == disabled_group_name:
continue
self._users.remove_group(userId, groupId)
# if disabled group is in the users groups, then return
if disabled_group_name in user.groups:
return
# ensure group exists
groups = self._groups.list_all()
if disabled_group_name not in groups:
self._groups.create(disabled_group_name)
# add disabled group
self._users.add_group(userId, self._groups.name_to_id(disabled_group_name))
# set password to a long string
self._user_set_password(userId, secrets.token_urlsafe(128))
def _user_set_password(self, userId: str, password: str):
subprocess.check_output(
[
"lldap_set_password",
f"--base-url={self._server_url}",
f"--token={self._server_auth_token}",
f"--username={userId}",
f"--password={password}",
]
)
def run(self):
data = json.load(open("test2.json", "r"))
self._run_ensure_attrs_user(self._attrsUser, data["user_attributes"])
self._run_ensure_attrs_user(self._attrsGroup, data["group_attributes"])
self._run_ensure_groups(data["groups"])
self._run_ensure_users(data["users"])
if __name__ == "__main__":
auth_token = os.getenv("LLDAP_TOKEN")
if not auth_token:
raise Exception("No LLDAP_TOKEN provided. please set")
x = LLDAP("https://ldap.fricloud.dk", auth_token)
x.run()