Compare commits

..

2 commits

Author SHA1 Message Date
eyjhb
06f5d3a5fb
gerd.lldap: added initial bootstrap 2024-12-22 21:57:54 +01:00
eyjhb
5749b1cf66
gerd.lldap: moved files around 2024-12-22 21:56:46 +01:00
7 changed files with 759 additions and 1 deletions

View file

@ -11,7 +11,7 @@
./gerd/services/fricloud-website.nix ./gerd/services/fricloud-website.nix
./gerd/services/member-website ./gerd/services/member-website
./gerd/services/lldap.nix ./gerd/services/lldap
./gerd/services/authelia ./gerd/services/authelia
./gerd/services/forgejo ./gerd/services/forgejo
./gerd/services/teeworlds.nix ./gerd/services/teeworlds.nix

View file

@ -0,0 +1,171 @@
from typing import Any
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from pprint import pprint
import re
import logging
logger = logging.getLogger(__name__)
def to_camelcase(s):
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*", "")
return "".join([s[0].lower(), s[1:]])
def to_snakecase(s):
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
class AttributeTypes:
STRING = "STRING"
INTEGER = "INTEGER"
JPEG_PHOTO = "JPEG_PHOTO"
DATE_TIME = "DATE_TIME"
class AttributeSchema:
def __init__(self, raw_vals: dict[str, Any]):
self.name: str = raw_vals["name"]
self.attributeType: str = raw_vals["attributeType"]
self.isList: bool = bool(raw_vals["isList"])
self.isVisible: bool = bool(raw_vals["isVisible"])
self.isEditable: bool = bool(raw_vals["isEditable"])
self.isReadonly: bool = bool(raw_vals["isReadonly"])
self.isHardcoded: bool = bool(raw_vals["isHardcoded"])
def __repr__(self):
return f"<AttributeSchema name={self.name} attributeType={self.attributeType} isList={self.isList} isVisible={self.isVisible} isEditable={self.isEditable} isReadonly={self.isReadonly} isHardcoded={self.isHardcoded} />"
class AttributeValue:
def __init__(self, raw_attribute: dict[str, Any]):
self.name: str = raw_attribute["name"]
tmpValue = raw_attribute.get("value", [])
if isinstance(tmpValue, str):
tmpValue = [tmpValue]
self.value: list[str] = tmpValue
def __repr__(self):
return f"<AttributeValue name={self.name} value={self.value} />"
class LLDAPAttributes:
def __init__(
self,
client: gql.Client,
using_user_attributes: bool = False,
using_group_attributes: bool = False,
):
self._client = client
self._using_user_attributes = using_user_attributes
self._using_group_attributes = using_group_attributes
if self._using_user_attributes and self._using_group_attributes:
raise Exception("can not both use user attributes and group attributes")
if not self._using_user_attributes and not self._using_group_attributes:
raise Exception("neither user attributes and group attributes specified")
def list_all(self) -> dict[str, AttributeSchema]:
ds = DSLSchema(self._client.schema)
if self._using_user_attributes:
querySchema = ds.Schema.userSchema
querySchemaStr = "userSchema"
else:
querySchema = ds.Schema.groupSchema
querySchemaStr = "groupSchema"
query = dsl_gql(
DSLQuery(
ds.Query.schema().select(
querySchema.select(
ds.AttributeList.attributes.select(
ds.AttributeSchema.name,
ds.AttributeSchema.attributeType,
ds.AttributeSchema.isList,
ds.AttributeSchema.isVisible,
ds.AttributeSchema.isEditable,
ds.AttributeSchema.isReadonly,
ds.AttributeSchema.isHardcoded,
)
)
),
),
)
result = self._client.execute(query)
attrs: dict[str, AttributeSchema] = {}
for raw_attribute in result["schema"][querySchemaStr]["attributes"]:
attr = AttributeSchema(raw_attribute)
attrs[attr.name] = attr
return attrs
def create(
self,
name: str,
attributeType: str,
isList: bool,
isVisible: bool,
isEditable: bool,
):
logger.debug(
f"adding attribute, name:'{name}' attributeType:'{attributeType}' isList:'{isList}' isVisible:'{isVisible}' isEditable:'{isEditable}'"
)
ds = DSLSchema(self._client.schema)
if self._using_user_attributes:
mutationSchema = ds.Mutation.addUserAttribute
else:
mutationSchema = ds.Mutation.addGroupAttribute
query = dsl_gql(
DSLMutation(
mutationSchema.args(
name=name,
attributeType=attributeType,
isList=isList,
isVisible=isVisible,
isEditable=isEditable,
).select(ds.Success.ok)
),
)
self._client.execute(query)
def get(self, name: str) -> AttributeSchema | None:
attrs = self.list_all()
return attrs.get(name)
def update(self, name: str):
raise Exception("unable to update attribute")
def delete(self, name: str):
logger.debug(f"deleting attribute '{name}'")
ds = DSLSchema(self._client.schema)
if self._using_user_attributes:
mutationSchema = ds.Mutation.deleteUserAttribute
else:
mutationSchema = ds.Mutation.deleteGroupAttribute
query = dsl_gql(
DSLMutation(
mutationSchema.args(
name=name,
).select(ds.Success.ok)
),
)
self._client.execute(query)

View file

@ -0,0 +1,136 @@
from typing import Any
import re
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from .utils import to_camelcase, to_snakecase
import logging
logger = logging.getLogger(__name__)
class Group:
def __init__(
self,
raw_attrs: list[dict[str, str]],
):
self._attributes: dict[str, Any] = {
item["name"]: item["value"] for item in raw_attrs
}
self.groupId: int = int(self.__getattr__("groupId")[0])
self.name: str = self.__getattr__("displayName")[0]
def _attributes_camelcase(self) -> dict[str, str]:
return {to_camelcase(k): v for k, v in self._attributes.items()}
def __getattr__(self, key: str):
return self._attributes_camelcase().get(key, "")
def __repr__(self):
return f"<Group groupId={self.groupId} name={self.name} />"
class LLDAPGroups:
def __init__(self, client: gql.Client):
self._client = client
def list_all(self) -> dict[str, Group]:
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLQuery(
ds.Query.groups().select(
ds.Group.attributes.select(
ds.AttributeValue.name,
ds.AttributeValue.value,
ds.AttributeValue.schema.select(
ds.AttributeSchema.isHardcoded,
),
),
),
),
)
result = self._client.execute(query)
groups: dict[str, Group] = {}
for group in result["groups"]:
g = Group(
raw_attrs=group.get("attributes", []),
)
groups[g.name] = g
return groups
def create(self, groupName: str):
logger.debug(f"creating group with name '{groupName}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.createGroup.args(
name=groupName,
).select(ds.Group.displayName)
),
)
self._client.execute(query)
def get_by_name(self, groupName: str) -> Group | None:
groups = self.list_all()
return groups.get(groupName)
def get_by_id(self, groupId: int) -> Group | None:
groups = self.list_all()
for group in groups.values():
if group.groupId == groupId:
return group
return None
def name_to_id(self, groupName: str) -> int:
group = self.get_by_name(groupName)
if not group:
raise Exception(f"no group with the name {groupName}")
return group.groupId
def update(self, groupId: int, attrs: dict[str, str | list[str]]):
insertAttributes: list[dict[str, str | list[str]]] = []
for k, v in attrs.items():
if isinstance(v, str):
v = [v]
insertAttributes.append({"name": to_snakecase(k), "value": v})
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.updateGroup.args(
group={
"id": groupId,
"insertAttributes": insertAttributes,
},
).select(ds.Success.ok),
),
)
self._client.execute(query)
def delete(self, groupId: int):
logger.debug(f"deleting group with id '{groupId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.deleteGroup.args(
groupId=groupId,
).select(ds.Success.ok),
),
)
self._client.execute(query)
def test(self):
self.list_all()
# self.update("testusername", {"displayName": "Test User Name"})

View file

@ -0,0 +1,287 @@
#!/usr/bin/env nix-shell
#!nix-shell --pure --keep LLDAP_TOKEN -i python3 -p "python3.withPackages (ps: with ps; [ requests gql aiohttp])"
from typing import Any
import subprocess
import secrets
import json
import os
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from pprint import pprint
from .users import LLDAPUsers
from .groups import LLDAPGroups
from .attributes import LLDAPAttributes
import logging
logging.basicConfig()
logging.getLogger("lldapbootstrap").setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class LLDAP:
def __init__(self, server_url: str, auth_token: str):
self._server_url: str = server_url
self._server_auth_token: str = auth_token
self._client: gql.Client = self._init_gql_client()
self._users = LLDAPUsers(self._client)
self._groups = LLDAPGroups(self._client)
self._attrsUser = LLDAPAttributes(self._client, using_user_attributes=True)
self._attrsGroup = LLDAPAttributes(self._client, using_group_attributes=True)
def _init_gql_client(self) -> gql.Client:
# Select your transport with a defined url endpoint
transport = AIOHTTPTransport(
url=f"{self._server_url}/api/graphql",
headers={"Authorization": f"Bearer {self._server_auth_token}"},
)
# Create a GraphQL client using the defined transport
client = gql.Client(transport=transport, fetch_schema_from_transport=True)
# force fetch schema
query = gql.gql(
"""
query {
users {
displayName
}
}
"""
)
result = client.execute(query)
return client
def _run_ensure_attrs_user(self, attrsClass, neededAttrsUser: list[dict[str, Any]]):
dictNeededAttrUser = {v["name"]: v for v in neededAttrsUser}
remoteAttrs = attrsClass.list_all()
# add needed attributes
for neededAttr in neededAttrsUser:
neededAttrName = neededAttr["name"]
if neededAttrName in remoteAttrs:
cattr = remoteAttrs[neededAttrName]
if (
neededAttr["attributeType"] != cattr.attributeType
or neededAttr["isEditable"] != cattr.isEditable
or neededAttr["isList"] != cattr.isList
or neededAttr["isVisible"] != cattr.isVisible
):
logger.debug(
f"attribute '{neededAttrName}' out of sync, deleting and adding again"
)
attrsClass.delete(neededAttrName)
else:
continue
attrsClass.create(
neededAttrName,
attributeType=neededAttr["attributeType"],
isEditable=neededAttr["isEditable"],
isList=neededAttr["isList"],
isVisible=neededAttr["isVisible"],
)
# remove unneeded attributes
for remoteAttrName, remoteAttr in remoteAttrs.items():
# skip hardcoded ones
if remoteAttr.isHardcoded:
continue
if remoteAttrName not in dictNeededAttrUser:
attrsClass.delete(remoteAttrName)
def _run_ensure_groups(self, neededGroups: list[dict[str, Any]]):
tmpNeededGroups = {v["display_name"]: v for v in neededGroups}
remoteGroups = self._groups.list_all()
for neededGroup in neededGroups:
neededGroupDisplay_Name = neededGroup["display_name"]
if neededGroupDisplay_Name not in remoteGroups:
self._groups.create(neededGroupDisplay_Name)
# refresh groups
remoteGroups = self._groups.list_all()
remoteGroup = remoteGroups[neededGroupDisplay_Name]
# we cannot update the display name, and we never would anyways
del neededGroup["display_name"]
self._groups.update(remoteGroup.groupId, neededGroup)
# delete unused groups
for remoteGroupName, remoteGroup in remoteGroups.items():
# skip all lldap_ groups
if remoteGroupName.startswith("lldap_"):
continue
if remoteGroupName not in tmpNeededGroups:
self._groups.delete(remoteGroup.groupId)
def _run_ensure_users(
self,
neededUsers: list[dict[str, Any]],
softDelete: bool = True,
):
tmpNeededUsers = {v["user_id"]: v for v in neededUsers}
remoteUsers = self._users.list_all()
for neededUser in neededUsers:
# get required info from dict, and DELETE from dict
# while we're at it. This means that we can safely use
# `neededUser` for updating later
neededUserId = neededUser.pop("user_id")
neededUserGroups = neededUser.pop("groups", [])
neededUserPassword: str | None = neededUser.pop("password", None)
# create user if needed
if neededUserId not in remoteUsers:
self._users.create(
neededUserId,
neededUser.get("mail", "no-email-specified"),
)
# refresh users
remoteUsers = self._users.list_all()
# update user
self._users.update(neededUserId, neededUser)
# set correct groups
remoteUser = remoteUsers[neededUserId]
# print warning about groups attribute
if neededUserGroups:
logger.info(
f"using attribute 'groups' for userId '{neededUserId}', for setting groups, NOT SETTING AS ATTRIBUTE!!"
)
# add to correct groups
for groupName in neededUserGroups:
if groupName not in remoteUser.groups:
self._users.add_group(
neededUserId,
self._groups.name_to_id(groupName),
)
# remove from unused groups
for groupName, group in remoteUser.groups.items():
if groupName not in neededUserGroups:
self._users.remove_group(
neededUserId,
self._groups.name_to_id(groupName),
)
if neededUserPassword:
logger.info(
f"using attribute 'password' for userId '{neededUserId}', for setting password, NOT SETTING AS ATTRIBUTE!!"
)
if neededUserPassword.startswith("file:"):
passwordFile = neededUserPassword[len("file:") :]
logger.debug(
f"reading password from file from file '{passwordFile}' for user '{neededUserId}'"
)
self._user_set_password(
neededUserId,
open(passwordFile, "r").read().strip(),
)
elif neededUserPassword.startswith("env:"):
cleanedPasswordEnv = neededUserPassword.strip()[len("env:") :]
logger.debug(
f"reading password from envvar '{cleanedPasswordEnv}' for user '{neededUserId}'"
)
password = os.getenv(cleanedPasswordEnv)
if not password:
raise Exception(
f"could not find env '{cleanedPasswordEnv}' for getting password"
)
self._user_set_password(
neededUserId,
password.strip(),
)
else:
logger.debug(
f"using the raw value of password, as the password for user '{neededUserId}'"
)
self._user_set_password(
neededUserId,
neededUserPassword.strip(),
)
# delete unused users
for remoteUserName, remoteUser in remoteUsers.items():
if remoteUserName not in tmpNeededUsers:
if softDelete:
self._user_disable(remoteUserName)
else:
self._users.delete(remoteUser.userId)
def _user_disable(self, userId: str, disabled_group_name: str = "disabled"):
user = self._users.get(userId)
if not user:
return
# remove all groups
for groupName, groupId in user.groups.items():
if groupName == disabled_group_name:
continue
self._users.remove_group(userId, groupId)
# if disabled group is in the users groups, then return
if disabled_group_name in user.groups:
return
# ensure group exists
groups = self._groups.list_all()
if disabled_group_name not in groups:
self._groups.create(disabled_group_name)
# add disabled group
self._users.add_group(userId, self._groups.name_to_id(disabled_group_name))
# set password to a long string
self._user_set_password(userId, secrets.token_urlsafe(128))
def _user_set_password(self, userId: str, password: str):
subprocess.check_output(
[
"lldap_set_password",
f"--base-url={self._server_url}",
f"--token={self._server_auth_token}",
f"--username={userId}",
f"--password={password}",
]
)
def run(self):
data = json.load(open("test2.json", "r"))
self._run_ensure_attrs_user(self._attrsUser, data["user_attributes"])
self._run_ensure_attrs_user(self._attrsGroup, data["group_attributes"])
self._run_ensure_groups(data["groups"])
self._run_ensure_users(data["users"])
if __name__ == "__main__":
auth_token = os.getenv("LLDAP_TOKEN")
if not auth_token:
raise Exception("No LLDAP_TOKEN provided. please set")
x = LLDAP("https://ldap.fricloud.dk", auth_token)
x.run()

View file

@ -0,0 +1,154 @@
from typing import Any
import re
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from .utils import to_camelcase, to_snakecase
import logging
logger = logging.getLogger(__name__)
class User:
def __init__(
self,
raw_groups: list[dict[str, Any]],
raw_attrs: list[dict[str, str]],
):
self._attributes: dict[str, Any] = {
item["name"]: item["value"] for item in raw_attrs
}
self.groups: dict[str, int] = {
info["displayName"]: info["id"] for info in raw_groups
}
self.userId: str = self.__getattr__("userId")[0]
self.email: str = self.__getattr__("mail")[0]
def _attributes_camelcase(self) -> dict[str, str]:
return {to_camelcase(k): v for k, v in self._attributes.items()}
def __getattr__(self, key: str):
return self._attributes_camelcase().get(key, "")
class LLDAPUsers:
def __init__(self, client: gql.Client):
self._client = client
def list_all(self) -> dict[str, User]:
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLQuery(
ds.Query.users().select(
ds.User.groups.select(
ds.Group.id,
ds.Group.displayName,
),
ds.User.attributes.select(
ds.AttributeValue.name,
ds.AttributeValue.value,
ds.AttributeValue.schema.select(
ds.AttributeSchema.isHardcoded,
),
),
),
),
)
result = self._client.execute(query)
users: dict[str, User] = {}
for user in result["users"]:
u = User(
raw_groups=user.get("groups", []),
raw_attrs=user.get("attributes", []),
)
users[u.userId] = u
return users
def create(self, userId: str, email: str):
logger.debug(f"creating user with name '{userId}' and email '{email}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.createUser.args(
user={"id": userId, "email": email},
).select(ds.User.displayName)
),
)
self._client.execute(query)
def get(self, userId: str):
users = self.list_all()
return users.get(userId)
def update(self, userId: str, attrs: dict[str, str | list[str]]):
insertAttributes: list[dict[str, str | list[str]]] = []
for k, v in attrs.items():
if isinstance(v, str):
v = [v]
insertAttributes.append({"name": to_snakecase(k), "value": v})
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.updateUser.args(
user={
"id": userId,
"insertAttributes": insertAttributes,
},
).select(ds.Success.ok),
),
)
self._client.execute(query)
def delete(self, userId: str):
logger.debug(f"deleting user with name '{userId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.deleteUser.args(
userId=userId,
).select(ds.Success.ok),
),
)
self._client.execute(query)
# groups
def add_group(self, userId: str, groupId: int):
logger.debug(f"adding user '{userId}' to group '{groupId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.addUserToGroup.args(
userId=userId,
groupId=groupId,
).select(ds.Success.ok)
),
)
self._client.execute(query)
def remove_group(self, userId: str, groupId: int):
logger.debug(f"removing user '{userId}' from group '{groupId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.removeUserFromGroup.args(
userId=userId,
groupId=groupId,
).select(ds.Success.ok)
),
)
self._client.execute(query)

View file

@ -0,0 +1,10 @@
import re
def to_camelcase(s):
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*", "")
return "".join([s[0].lower(), s[1:]])
def to_snakecase(s):
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()