Compare commits
2 commits
629f8f02d7
...
06f5d3a5fb
Author | SHA1 | Date | |
---|---|---|---|
|
06f5d3a5fb | ||
|
5749b1cf66 |
7 changed files with 759 additions and 1 deletions
|
@ -11,7 +11,7 @@
|
|||
|
||||
./gerd/services/fricloud-website.nix
|
||||
./gerd/services/member-website
|
||||
./gerd/services/lldap.nix
|
||||
./gerd/services/lldap
|
||||
./gerd/services/authelia
|
||||
./gerd/services/forgejo
|
||||
./gerd/services/teeworlds.nix
|
||||
|
|
171
machines/gerd/services/lldap/bootstrap/attributes.py
Normal file
171
machines/gerd/services/lldap/bootstrap/attributes.py
Normal file
|
@ -0,0 +1,171 @@
|
|||
from typing import Any
|
||||
|
||||
import gql
|
||||
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
|
||||
from pprint import pprint
|
||||
import re
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def to_camelcase(s):
|
||||
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*", "")
|
||||
return "".join([s[0].lower(), s[1:]])
|
||||
|
||||
|
||||
def to_snakecase(s):
|
||||
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
|
||||
|
||||
|
||||
class AttributeTypes:
|
||||
STRING = "STRING"
|
||||
INTEGER = "INTEGER"
|
||||
JPEG_PHOTO = "JPEG_PHOTO"
|
||||
DATE_TIME = "DATE_TIME"
|
||||
|
||||
|
||||
class AttributeSchema:
|
||||
def __init__(self, raw_vals: dict[str, Any]):
|
||||
self.name: str = raw_vals["name"]
|
||||
self.attributeType: str = raw_vals["attributeType"]
|
||||
self.isList: bool = bool(raw_vals["isList"])
|
||||
self.isVisible: bool = bool(raw_vals["isVisible"])
|
||||
self.isEditable: bool = bool(raw_vals["isEditable"])
|
||||
self.isReadonly: bool = bool(raw_vals["isReadonly"])
|
||||
self.isHardcoded: bool = bool(raw_vals["isHardcoded"])
|
||||
|
||||
def __repr__(self):
|
||||
return f"<AttributeSchema name={self.name} attributeType={self.attributeType} isList={self.isList} isVisible={self.isVisible} isEditable={self.isEditable} isReadonly={self.isReadonly} isHardcoded={self.isHardcoded} />"
|
||||
|
||||
|
||||
class AttributeValue:
|
||||
def __init__(self, raw_attribute: dict[str, Any]):
|
||||
self.name: str = raw_attribute["name"]
|
||||
|
||||
tmpValue = raw_attribute.get("value", [])
|
||||
if isinstance(tmpValue, str):
|
||||
tmpValue = [tmpValue]
|
||||
|
||||
self.value: list[str] = tmpValue
|
||||
|
||||
def __repr__(self):
|
||||
return f"<AttributeValue name={self.name} value={self.value} />"
|
||||
|
||||
|
||||
class LLDAPAttributes:
|
||||
def __init__(
|
||||
self,
|
||||
client: gql.Client,
|
||||
using_user_attributes: bool = False,
|
||||
using_group_attributes: bool = False,
|
||||
):
|
||||
self._client = client
|
||||
|
||||
self._using_user_attributes = using_user_attributes
|
||||
self._using_group_attributes = using_group_attributes
|
||||
|
||||
if self._using_user_attributes and self._using_group_attributes:
|
||||
raise Exception("can not both use user attributes and group attributes")
|
||||
|
||||
if not self._using_user_attributes and not self._using_group_attributes:
|
||||
raise Exception("neither user attributes and group attributes specified")
|
||||
|
||||
def list_all(self) -> dict[str, AttributeSchema]:
|
||||
ds = DSLSchema(self._client.schema)
|
||||
|
||||
if self._using_user_attributes:
|
||||
querySchema = ds.Schema.userSchema
|
||||
querySchemaStr = "userSchema"
|
||||
else:
|
||||
querySchema = ds.Schema.groupSchema
|
||||
querySchemaStr = "groupSchema"
|
||||
|
||||
query = dsl_gql(
|
||||
DSLQuery(
|
||||
ds.Query.schema().select(
|
||||
querySchema.select(
|
||||
ds.AttributeList.attributes.select(
|
||||
ds.AttributeSchema.name,
|
||||
ds.AttributeSchema.attributeType,
|
||||
ds.AttributeSchema.isList,
|
||||
ds.AttributeSchema.isVisible,
|
||||
ds.AttributeSchema.isEditable,
|
||||
ds.AttributeSchema.isReadonly,
|
||||
ds.AttributeSchema.isHardcoded,
|
||||
)
|
||||
)
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
result = self._client.execute(query)
|
||||
|
||||
attrs: dict[str, AttributeSchema] = {}
|
||||
for raw_attribute in result["schema"][querySchemaStr]["attributes"]:
|
||||
attr = AttributeSchema(raw_attribute)
|
||||
attrs[attr.name] = attr
|
||||
|
||||
return attrs
|
||||
|
||||
def create(
|
||||
self,
|
||||
name: str,
|
||||
attributeType: str,
|
||||
isList: bool,
|
||||
isVisible: bool,
|
||||
isEditable: bool,
|
||||
):
|
||||
logger.debug(
|
||||
f"adding attribute, name:'{name}' attributeType:'{attributeType}' isList:'{isList}' isVisible:'{isVisible}' isEditable:'{isEditable}'"
|
||||
)
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
|
||||
if self._using_user_attributes:
|
||||
mutationSchema = ds.Mutation.addUserAttribute
|
||||
else:
|
||||
mutationSchema = ds.Mutation.addGroupAttribute
|
||||
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
mutationSchema.args(
|
||||
name=name,
|
||||
attributeType=attributeType,
|
||||
isList=isList,
|
||||
isVisible=isVisible,
|
||||
isEditable=isEditable,
|
||||
).select(ds.Success.ok)
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def get(self, name: str) -> AttributeSchema | None:
|
||||
attrs = self.list_all()
|
||||
return attrs.get(name)
|
||||
|
||||
def update(self, name: str):
|
||||
raise Exception("unable to update attribute")
|
||||
|
||||
def delete(self, name: str):
|
||||
logger.debug(f"deleting attribute '{name}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
|
||||
if self._using_user_attributes:
|
||||
mutationSchema = ds.Mutation.deleteUserAttribute
|
||||
else:
|
||||
mutationSchema = ds.Mutation.deleteGroupAttribute
|
||||
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
mutationSchema.args(
|
||||
name=name,
|
||||
).select(ds.Success.ok)
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
136
machines/gerd/services/lldap/bootstrap/groups.py
Normal file
136
machines/gerd/services/lldap/bootstrap/groups.py
Normal file
|
@ -0,0 +1,136 @@
|
|||
from typing import Any
|
||||
import re
|
||||
|
||||
import gql
|
||||
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
|
||||
from .utils import to_camelcase, to_snakecase
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Group:
|
||||
def __init__(
|
||||
self,
|
||||
raw_attrs: list[dict[str, str]],
|
||||
):
|
||||
self._attributes: dict[str, Any] = {
|
||||
item["name"]: item["value"] for item in raw_attrs
|
||||
}
|
||||
|
||||
self.groupId: int = int(self.__getattr__("groupId")[0])
|
||||
self.name: str = self.__getattr__("displayName")[0]
|
||||
|
||||
def _attributes_camelcase(self) -> dict[str, str]:
|
||||
return {to_camelcase(k): v for k, v in self._attributes.items()}
|
||||
|
||||
def __getattr__(self, key: str):
|
||||
return self._attributes_camelcase().get(key, "")
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Group groupId={self.groupId} name={self.name} />"
|
||||
|
||||
|
||||
class LLDAPGroups:
|
||||
def __init__(self, client: gql.Client):
|
||||
self._client = client
|
||||
|
||||
def list_all(self) -> dict[str, Group]:
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLQuery(
|
||||
ds.Query.groups().select(
|
||||
ds.Group.attributes.select(
|
||||
ds.AttributeValue.name,
|
||||
ds.AttributeValue.value,
|
||||
ds.AttributeValue.schema.select(
|
||||
ds.AttributeSchema.isHardcoded,
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
result = self._client.execute(query)
|
||||
|
||||
groups: dict[str, Group] = {}
|
||||
for group in result["groups"]:
|
||||
g = Group(
|
||||
raw_attrs=group.get("attributes", []),
|
||||
)
|
||||
groups[g.name] = g
|
||||
|
||||
return groups
|
||||
|
||||
def create(self, groupName: str):
|
||||
logger.debug(f"creating group with name '{groupName}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.createGroup.args(
|
||||
name=groupName,
|
||||
).select(ds.Group.displayName)
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def get_by_name(self, groupName: str) -> Group | None:
|
||||
groups = self.list_all()
|
||||
return groups.get(groupName)
|
||||
|
||||
def get_by_id(self, groupId: int) -> Group | None:
|
||||
groups = self.list_all()
|
||||
for group in groups.values():
|
||||
if group.groupId == groupId:
|
||||
return group
|
||||
|
||||
return None
|
||||
|
||||
def name_to_id(self, groupName: str) -> int:
|
||||
group = self.get_by_name(groupName)
|
||||
if not group:
|
||||
raise Exception(f"no group with the name {groupName}")
|
||||
|
||||
return group.groupId
|
||||
|
||||
def update(self, groupId: int, attrs: dict[str, str | list[str]]):
|
||||
insertAttributes: list[dict[str, str | list[str]]] = []
|
||||
for k, v in attrs.items():
|
||||
if isinstance(v, str):
|
||||
v = [v]
|
||||
|
||||
insertAttributes.append({"name": to_snakecase(k), "value": v})
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.updateGroup.args(
|
||||
group={
|
||||
"id": groupId,
|
||||
"insertAttributes": insertAttributes,
|
||||
},
|
||||
).select(ds.Success.ok),
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def delete(self, groupId: int):
|
||||
logger.debug(f"deleting group with id '{groupId}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.deleteGroup.args(
|
||||
groupId=groupId,
|
||||
).select(ds.Success.ok),
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def test(self):
|
||||
self.list_all()
|
||||
# self.update("testusername", {"displayName": "Test User Name"})
|
287
machines/gerd/services/lldap/bootstrap/main.py
Normal file
287
machines/gerd/services/lldap/bootstrap/main.py
Normal file
|
@ -0,0 +1,287 @@
|
|||
#!/usr/bin/env nix-shell
|
||||
#!nix-shell --pure --keep LLDAP_TOKEN -i python3 -p "python3.withPackages (ps: with ps; [ requests gql aiohttp])"
|
||||
|
||||
from typing import Any
|
||||
import subprocess
|
||||
import secrets
|
||||
import json
|
||||
import os
|
||||
|
||||
import gql
|
||||
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
from pprint import pprint
|
||||
|
||||
|
||||
from .users import LLDAPUsers
|
||||
from .groups import LLDAPGroups
|
||||
from .attributes import LLDAPAttributes
|
||||
|
||||
import logging
|
||||
|
||||
logging.basicConfig()
|
||||
logging.getLogger("lldapbootstrap").setLevel(logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class LLDAP:
|
||||
def __init__(self, server_url: str, auth_token: str):
|
||||
self._server_url: str = server_url
|
||||
self._server_auth_token: str = auth_token
|
||||
|
||||
self._client: gql.Client = self._init_gql_client()
|
||||
|
||||
self._users = LLDAPUsers(self._client)
|
||||
self._groups = LLDAPGroups(self._client)
|
||||
self._attrsUser = LLDAPAttributes(self._client, using_user_attributes=True)
|
||||
self._attrsGroup = LLDAPAttributes(self._client, using_group_attributes=True)
|
||||
|
||||
def _init_gql_client(self) -> gql.Client:
|
||||
# Select your transport with a defined url endpoint
|
||||
transport = AIOHTTPTransport(
|
||||
url=f"{self._server_url}/api/graphql",
|
||||
headers={"Authorization": f"Bearer {self._server_auth_token}"},
|
||||
)
|
||||
|
||||
# Create a GraphQL client using the defined transport
|
||||
client = gql.Client(transport=transport, fetch_schema_from_transport=True)
|
||||
|
||||
# force fetch schema
|
||||
query = gql.gql(
|
||||
"""
|
||||
query {
|
||||
users {
|
||||
displayName
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
result = client.execute(query)
|
||||
|
||||
return client
|
||||
|
||||
def _run_ensure_attrs_user(self, attrsClass, neededAttrsUser: list[dict[str, Any]]):
|
||||
dictNeededAttrUser = {v["name"]: v for v in neededAttrsUser}
|
||||
remoteAttrs = attrsClass.list_all()
|
||||
|
||||
# add needed attributes
|
||||
for neededAttr in neededAttrsUser:
|
||||
neededAttrName = neededAttr["name"]
|
||||
|
||||
if neededAttrName in remoteAttrs:
|
||||
cattr = remoteAttrs[neededAttrName]
|
||||
if (
|
||||
neededAttr["attributeType"] != cattr.attributeType
|
||||
or neededAttr["isEditable"] != cattr.isEditable
|
||||
or neededAttr["isList"] != cattr.isList
|
||||
or neededAttr["isVisible"] != cattr.isVisible
|
||||
):
|
||||
logger.debug(
|
||||
f"attribute '{neededAttrName}' out of sync, deleting and adding again"
|
||||
)
|
||||
attrsClass.delete(neededAttrName)
|
||||
else:
|
||||
continue
|
||||
|
||||
attrsClass.create(
|
||||
neededAttrName,
|
||||
attributeType=neededAttr["attributeType"],
|
||||
isEditable=neededAttr["isEditable"],
|
||||
isList=neededAttr["isList"],
|
||||
isVisible=neededAttr["isVisible"],
|
||||
)
|
||||
|
||||
# remove unneeded attributes
|
||||
for remoteAttrName, remoteAttr in remoteAttrs.items():
|
||||
# skip hardcoded ones
|
||||
if remoteAttr.isHardcoded:
|
||||
continue
|
||||
|
||||
if remoteAttrName not in dictNeededAttrUser:
|
||||
attrsClass.delete(remoteAttrName)
|
||||
|
||||
def _run_ensure_groups(self, neededGroups: list[dict[str, Any]]):
|
||||
tmpNeededGroups = {v["display_name"]: v for v in neededGroups}
|
||||
remoteGroups = self._groups.list_all()
|
||||
|
||||
for neededGroup in neededGroups:
|
||||
neededGroupDisplay_Name = neededGroup["display_name"]
|
||||
|
||||
if neededGroupDisplay_Name not in remoteGroups:
|
||||
self._groups.create(neededGroupDisplay_Name)
|
||||
|
||||
# refresh groups
|
||||
remoteGroups = self._groups.list_all()
|
||||
|
||||
remoteGroup = remoteGroups[neededGroupDisplay_Name]
|
||||
|
||||
# we cannot update the display name, and we never would anyways
|
||||
del neededGroup["display_name"]
|
||||
|
||||
self._groups.update(remoteGroup.groupId, neededGroup)
|
||||
|
||||
# delete unused groups
|
||||
for remoteGroupName, remoteGroup in remoteGroups.items():
|
||||
# skip all lldap_ groups
|
||||
if remoteGroupName.startswith("lldap_"):
|
||||
continue
|
||||
|
||||
if remoteGroupName not in tmpNeededGroups:
|
||||
self._groups.delete(remoteGroup.groupId)
|
||||
|
||||
def _run_ensure_users(
|
||||
self,
|
||||
neededUsers: list[dict[str, Any]],
|
||||
softDelete: bool = True,
|
||||
):
|
||||
tmpNeededUsers = {v["user_id"]: v for v in neededUsers}
|
||||
remoteUsers = self._users.list_all()
|
||||
|
||||
for neededUser in neededUsers:
|
||||
# get required info from dict, and DELETE from dict
|
||||
# while we're at it. This means that we can safely use
|
||||
# `neededUser` for updating later
|
||||
neededUserId = neededUser.pop("user_id")
|
||||
neededUserGroups = neededUser.pop("groups", [])
|
||||
neededUserPassword: str | None = neededUser.pop("password", None)
|
||||
|
||||
# create user if needed
|
||||
if neededUserId not in remoteUsers:
|
||||
self._users.create(
|
||||
neededUserId,
|
||||
neededUser.get("mail", "no-email-specified"),
|
||||
)
|
||||
|
||||
# refresh users
|
||||
remoteUsers = self._users.list_all()
|
||||
|
||||
# update user
|
||||
self._users.update(neededUserId, neededUser)
|
||||
|
||||
# set correct groups
|
||||
remoteUser = remoteUsers[neededUserId]
|
||||
|
||||
# print warning about groups attribute
|
||||
if neededUserGroups:
|
||||
logger.info(
|
||||
f"using attribute 'groups' for userId '{neededUserId}', for setting groups, NOT SETTING AS ATTRIBUTE!!"
|
||||
)
|
||||
|
||||
# add to correct groups
|
||||
for groupName in neededUserGroups:
|
||||
if groupName not in remoteUser.groups:
|
||||
self._users.add_group(
|
||||
neededUserId,
|
||||
self._groups.name_to_id(groupName),
|
||||
)
|
||||
|
||||
# remove from unused groups
|
||||
for groupName, group in remoteUser.groups.items():
|
||||
if groupName not in neededUserGroups:
|
||||
self._users.remove_group(
|
||||
neededUserId,
|
||||
self._groups.name_to_id(groupName),
|
||||
)
|
||||
|
||||
if neededUserPassword:
|
||||
logger.info(
|
||||
f"using attribute 'password' for userId '{neededUserId}', for setting password, NOT SETTING AS ATTRIBUTE!!"
|
||||
)
|
||||
|
||||
if neededUserPassword.startswith("file:"):
|
||||
passwordFile = neededUserPassword[len("file:") :]
|
||||
logger.debug(
|
||||
f"reading password from file from file '{passwordFile}' for user '{neededUserId}'"
|
||||
)
|
||||
|
||||
self._user_set_password(
|
||||
neededUserId,
|
||||
open(passwordFile, "r").read().strip(),
|
||||
)
|
||||
elif neededUserPassword.startswith("env:"):
|
||||
cleanedPasswordEnv = neededUserPassword.strip()[len("env:") :]
|
||||
logger.debug(
|
||||
f"reading password from envvar '{cleanedPasswordEnv}' for user '{neededUserId}'"
|
||||
)
|
||||
|
||||
password = os.getenv(cleanedPasswordEnv)
|
||||
if not password:
|
||||
raise Exception(
|
||||
f"could not find env '{cleanedPasswordEnv}' for getting password"
|
||||
)
|
||||
self._user_set_password(
|
||||
neededUserId,
|
||||
password.strip(),
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"using the raw value of password, as the password for user '{neededUserId}'"
|
||||
)
|
||||
self._user_set_password(
|
||||
neededUserId,
|
||||
neededUserPassword.strip(),
|
||||
)
|
||||
|
||||
# delete unused users
|
||||
for remoteUserName, remoteUser in remoteUsers.items():
|
||||
if remoteUserName not in tmpNeededUsers:
|
||||
if softDelete:
|
||||
self._user_disable(remoteUserName)
|
||||
else:
|
||||
self._users.delete(remoteUser.userId)
|
||||
|
||||
def _user_disable(self, userId: str, disabled_group_name: str = "disabled"):
|
||||
user = self._users.get(userId)
|
||||
if not user:
|
||||
return
|
||||
|
||||
# remove all groups
|
||||
for groupName, groupId in user.groups.items():
|
||||
if groupName == disabled_group_name:
|
||||
continue
|
||||
|
||||
self._users.remove_group(userId, groupId)
|
||||
|
||||
# if disabled group is in the users groups, then return
|
||||
if disabled_group_name in user.groups:
|
||||
return
|
||||
|
||||
# ensure group exists
|
||||
groups = self._groups.list_all()
|
||||
if disabled_group_name not in groups:
|
||||
self._groups.create(disabled_group_name)
|
||||
|
||||
# add disabled group
|
||||
self._users.add_group(userId, self._groups.name_to_id(disabled_group_name))
|
||||
|
||||
# set password to a long string
|
||||
self._user_set_password(userId, secrets.token_urlsafe(128))
|
||||
|
||||
def _user_set_password(self, userId: str, password: str):
|
||||
subprocess.check_output(
|
||||
[
|
||||
"lldap_set_password",
|
||||
f"--base-url={self._server_url}",
|
||||
f"--token={self._server_auth_token}",
|
||||
f"--username={userId}",
|
||||
f"--password={password}",
|
||||
]
|
||||
)
|
||||
|
||||
def run(self):
|
||||
data = json.load(open("test2.json", "r"))
|
||||
|
||||
self._run_ensure_attrs_user(self._attrsUser, data["user_attributes"])
|
||||
self._run_ensure_attrs_user(self._attrsGroup, data["group_attributes"])
|
||||
self._run_ensure_groups(data["groups"])
|
||||
self._run_ensure_users(data["users"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
auth_token = os.getenv("LLDAP_TOKEN")
|
||||
if not auth_token:
|
||||
raise Exception("No LLDAP_TOKEN provided. please set")
|
||||
|
||||
x = LLDAP("https://ldap.fricloud.dk", auth_token)
|
||||
x.run()
|
154
machines/gerd/services/lldap/bootstrap/users.py
Normal file
154
machines/gerd/services/lldap/bootstrap/users.py
Normal file
|
@ -0,0 +1,154 @@
|
|||
from typing import Any
|
||||
import re
|
||||
|
||||
import gql
|
||||
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
|
||||
from .utils import to_camelcase, to_snakecase
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class User:
|
||||
def __init__(
|
||||
self,
|
||||
raw_groups: list[dict[str, Any]],
|
||||
raw_attrs: list[dict[str, str]],
|
||||
):
|
||||
self._attributes: dict[str, Any] = {
|
||||
item["name"]: item["value"] for item in raw_attrs
|
||||
}
|
||||
|
||||
self.groups: dict[str, int] = {
|
||||
info["displayName"]: info["id"] for info in raw_groups
|
||||
}
|
||||
self.userId: str = self.__getattr__("userId")[0]
|
||||
self.email: str = self.__getattr__("mail")[0]
|
||||
|
||||
def _attributes_camelcase(self) -> dict[str, str]:
|
||||
return {to_camelcase(k): v for k, v in self._attributes.items()}
|
||||
|
||||
def __getattr__(self, key: str):
|
||||
return self._attributes_camelcase().get(key, "")
|
||||
|
||||
|
||||
class LLDAPUsers:
|
||||
def __init__(self, client: gql.Client):
|
||||
self._client = client
|
||||
|
||||
def list_all(self) -> dict[str, User]:
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLQuery(
|
||||
ds.Query.users().select(
|
||||
ds.User.groups.select(
|
||||
ds.Group.id,
|
||||
ds.Group.displayName,
|
||||
),
|
||||
ds.User.attributes.select(
|
||||
ds.AttributeValue.name,
|
||||
ds.AttributeValue.value,
|
||||
ds.AttributeValue.schema.select(
|
||||
ds.AttributeSchema.isHardcoded,
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
result = self._client.execute(query)
|
||||
|
||||
users: dict[str, User] = {}
|
||||
for user in result["users"]:
|
||||
u = User(
|
||||
raw_groups=user.get("groups", []),
|
||||
raw_attrs=user.get("attributes", []),
|
||||
)
|
||||
users[u.userId] = u
|
||||
|
||||
return users
|
||||
|
||||
def create(self, userId: str, email: str):
|
||||
logger.debug(f"creating user with name '{userId}' and email '{email}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.createUser.args(
|
||||
user={"id": userId, "email": email},
|
||||
).select(ds.User.displayName)
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def get(self, userId: str):
|
||||
users = self.list_all()
|
||||
return users.get(userId)
|
||||
|
||||
def update(self, userId: str, attrs: dict[str, str | list[str]]):
|
||||
insertAttributes: list[dict[str, str | list[str]]] = []
|
||||
for k, v in attrs.items():
|
||||
if isinstance(v, str):
|
||||
v = [v]
|
||||
|
||||
insertAttributes.append({"name": to_snakecase(k), "value": v})
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.updateUser.args(
|
||||
user={
|
||||
"id": userId,
|
||||
"insertAttributes": insertAttributes,
|
||||
},
|
||||
).select(ds.Success.ok),
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
def delete(self, userId: str):
|
||||
logger.debug(f"deleting user with name '{userId}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.deleteUser.args(
|
||||
userId=userId,
|
||||
).select(ds.Success.ok),
|
||||
),
|
||||
)
|
||||
self._client.execute(query)
|
||||
|
||||
# groups
|
||||
def add_group(self, userId: str, groupId: int):
|
||||
logger.debug(f"adding user '{userId}' to group '{groupId}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.addUserToGroup.args(
|
||||
userId=userId,
|
||||
groupId=groupId,
|
||||
).select(ds.Success.ok)
|
||||
),
|
||||
)
|
||||
|
||||
self._client.execute(query)
|
||||
|
||||
def remove_group(self, userId: str, groupId: int):
|
||||
logger.debug(f"removing user '{userId}' from group '{groupId}'")
|
||||
|
||||
ds = DSLSchema(self._client.schema)
|
||||
query = dsl_gql(
|
||||
DSLMutation(
|
||||
ds.Mutation.removeUserFromGroup.args(
|
||||
userId=userId,
|
||||
groupId=groupId,
|
||||
).select(ds.Success.ok)
|
||||
),
|
||||
)
|
||||
|
||||
self._client.execute(query)
|
10
machines/gerd/services/lldap/bootstrap/utils.py
Normal file
10
machines/gerd/services/lldap/bootstrap/utils.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
import re
|
||||
|
||||
|
||||
def to_camelcase(s):
|
||||
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*", "")
|
||||
return "".join([s[0].lower(), s[1:]])
|
||||
|
||||
|
||||
def to_snakecase(s):
|
||||
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
|
Loading…
Reference in a new issue