server-configs/machines/gerd/services/lldap/bootstrap/users.py

155 lines
4.5 KiB
Python
Raw Permalink Normal View History

2024-12-22 20:57:54 +00:00
from typing import Any
import re
import gql
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
from gql.transport.aiohttp import AIOHTTPTransport
from .utils import to_camelcase, to_snakecase
import logging
logger = logging.getLogger(__name__)
class User:
def __init__(
self,
raw_groups: list[dict[str, Any]],
raw_attrs: list[dict[str, str]],
):
self._attributes: dict[str, Any] = {
item["name"]: item["value"] for item in raw_attrs
}
self.groups: dict[str, int] = {
info["displayName"]: info["id"] for info in raw_groups
}
self.userId: str = self.__getattr__("userId")[0]
self.email: str = self.__getattr__("mail")[0]
def _attributes_camelcase(self) -> dict[str, str]:
return {to_camelcase(k): v for k, v in self._attributes.items()}
def __getattr__(self, key: str):
return self._attributes_camelcase().get(key, "")
class LLDAPUsers:
def __init__(self, client: gql.Client):
self._client = client
def list_all(self) -> dict[str, User]:
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLQuery(
ds.Query.users().select(
ds.User.groups.select(
ds.Group.id,
ds.Group.displayName,
),
ds.User.attributes.select(
ds.AttributeValue.name,
ds.AttributeValue.value,
ds.AttributeValue.schema.select(
ds.AttributeSchema.isHardcoded,
),
),
),
),
)
result = self._client.execute(query)
users: dict[str, User] = {}
for user in result["users"]:
u = User(
raw_groups=user.get("groups", []),
raw_attrs=user.get("attributes", []),
)
users[u.userId] = u
return users
def create(self, userId: str, email: str):
logger.debug(f"creating user with name '{userId}' and email '{email}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.createUser.args(
user={"id": userId, "email": email},
).select(ds.User.displayName)
),
)
self._client.execute(query)
def get(self, userId: str):
users = self.list_all()
return users.get(userId)
def update(self, userId: str, attrs: dict[str, str | list[str]]):
insertAttributes: list[dict[str, str | list[str]]] = []
for k, v in attrs.items():
if isinstance(v, str):
v = [v]
insertAttributes.append({"name": to_snakecase(k), "value": v})
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.updateUser.args(
user={
"id": userId,
"insertAttributes": insertAttributes,
},
).select(ds.Success.ok),
),
)
self._client.execute(query)
def delete(self, userId: str):
logger.debug(f"deleting user with name '{userId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.deleteUser.args(
userId=userId,
).select(ds.Success.ok),
),
)
self._client.execute(query)
# groups
def add_group(self, userId: str, groupId: int):
logger.debug(f"adding user '{userId}' to group '{groupId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.addUserToGroup.args(
userId=userId,
groupId=groupId,
).select(ds.Success.ok)
),
)
self._client.execute(query)
def remove_group(self, userId: str, groupId: int):
logger.debug(f"removing user '{userId}' from group '{groupId}'")
ds = DSLSchema(self._client.schema)
query = dsl_gql(
DSLMutation(
ds.Mutation.removeUserFromGroup.args(
userId=userId,
groupId=groupId,
).select(ds.Success.ok)
),
)
self._client.execute(query)