from typing import Any import re import gql from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation from gql.transport.aiohttp import AIOHTTPTransport from .utils import to_camelcase, to_snakecase import logging logger = logging.getLogger(__name__) class User: def __init__( self, raw_groups: list[dict[str, Any]], raw_attrs: list[dict[str, str]], ): self._attributes: dict[str, Any] = { item["name"]: item["value"] for item in raw_attrs } self.groups: dict[str, int] = { info["displayName"]: info["id"] for info in raw_groups } self.userId: str = self.__getattr__("userId")[0] self.email: str = self.__getattr__("mail")[0] def _attributes_camelcase(self) -> dict[str, str]: return {to_camelcase(k): v for k, v in self._attributes.items()} def __getattr__(self, key: str): return self._attributes_camelcase().get(key, "") class LLDAPUsers: def __init__(self, client: gql.Client): self._client = client def list_all(self) -> dict[str, User]: ds = DSLSchema(self._client.schema) query = dsl_gql( DSLQuery( ds.Query.users().select( ds.User.groups.select( ds.Group.id, ds.Group.displayName, ), ds.User.attributes.select( ds.AttributeValue.name, ds.AttributeValue.value, ds.AttributeValue.schema.select( ds.AttributeSchema.isHardcoded, ), ), ), ), ) result = self._client.execute(query) users: dict[str, User] = {} for user in result["users"]: u = User( raw_groups=user.get("groups", []), raw_attrs=user.get("attributes", []), ) users[u.userId] = u return users def create(self, userId: str, email: str): logger.debug(f"creating user with name '{userId}' and email '{email}'") ds = DSLSchema(self._client.schema) query = dsl_gql( DSLMutation( ds.Mutation.createUser.args( user={"id": userId, "email": email}, ).select(ds.User.displayName) ), ) self._client.execute(query) def get(self, userId: str): users = self.list_all() return users.get(userId) def update(self, userId: str, attrs: dict[str, str | list[str]]): insertAttributes: list[dict[str, str | list[str]]] = [] for k, v in attrs.items(): if isinstance(v, str): v = [v] insertAttributes.append({"name": to_snakecase(k), "value": v}) ds = DSLSchema(self._client.schema) query = dsl_gql( DSLMutation( ds.Mutation.updateUser.args( user={ "id": userId, "insertAttributes": insertAttributes, }, ).select(ds.Success.ok), ), ) self._client.execute(query) def delete(self, userId: str): logger.debug(f"deleting user with name '{userId}'") ds = DSLSchema(self._client.schema) query = dsl_gql( DSLMutation( ds.Mutation.deleteUser.args( userId=userId, ).select(ds.Success.ok), ), ) self._client.execute(query) # groups def add_group(self, userId: str, groupId: int): logger.debug(f"adding user '{userId}' to group '{groupId}'") ds = DSLSchema(self._client.schema) query = dsl_gql( DSLMutation( ds.Mutation.addUserToGroup.args( userId=userId, groupId=groupId, ).select(ds.Success.ok) ), ) self._client.execute(query) def remove_group(self, userId: str, groupId: int): logger.debug(f"removing user '{userId}' from group '{groupId}'") ds = DSLSchema(self._client.schema) query = dsl_gql( DSLMutation( ds.Mutation.removeUserFromGroup.args( userId=userId, groupId=groupId, ).select(ds.Success.ok) ), ) self._client.execute(query)