from typing import Any import gql from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation from gql.transport.aiohttp import AIOHTTPTransport from .utils import to_camelcase, to_snakecase import logging logger = logging.getLogger(__name__) class AttributeTypes: STRING = "STRING" INTEGER = "INTEGER" JPEG_PHOTO = "JPEG_PHOTO" DATE_TIME = "DATE_TIME" class AttributeSchema: def __init__(self, raw_vals: dict[str, Any]): self.name: str = raw_vals["name"] self.attributeType: str = raw_vals["attributeType"] self.isList: bool = bool(raw_vals["isList"]) self.isVisible: bool = bool(raw_vals["isVisible"]) self.isEditable: bool = bool(raw_vals["isEditable"]) self.isReadonly: bool = bool(raw_vals["isReadonly"]) self.isHardcoded: bool = bool(raw_vals["isHardcoded"]) def __repr__(self): return f"" class AttributeValue: def __init__(self, raw_attribute: dict[str, Any]): self.name: str = raw_attribute["name"] tmpValue = raw_attribute.get("value", []) if isinstance(tmpValue, str): tmpValue = [tmpValue] self.value: list[str] = tmpValue def __repr__(self): return f"" class LLDAPAttributes: def __init__( self, client: gql.Client, using_user_attributes: bool = False, using_group_attributes: bool = False, ): self._client = client self._using_user_attributes = using_user_attributes self._using_group_attributes = using_group_attributes if self._using_user_attributes and self._using_group_attributes: raise Exception("can not both use user attributes and group attributes") if not self._using_user_attributes and not self._using_group_attributes: raise Exception("neither user attributes and group attributes specified") def list_all(self) -> dict[str, AttributeSchema]: ds = DSLSchema(self._client.schema) if self._using_user_attributes: querySchema = ds.Schema.userSchema querySchemaStr = "userSchema" else: querySchema = ds.Schema.groupSchema querySchemaStr = "groupSchema" query = dsl_gql( DSLQuery( ds.Query.schema().select( querySchema.select( ds.AttributeList.attributes.select( ds.AttributeSchema.name, ds.AttributeSchema.attributeType, ds.AttributeSchema.isList, ds.AttributeSchema.isVisible, ds.AttributeSchema.isEditable, ds.AttributeSchema.isReadonly, ds.AttributeSchema.isHardcoded, ) ) ), ), ) result = self._client.execute(query) attrs: dict[str, AttributeSchema] = {} for raw_attribute in result["schema"][querySchemaStr]["attributes"]: attr = AttributeSchema(raw_attribute) attrs[attr.name] = attr return attrs def create( self, name: str, attributeType: str, isList: bool, isVisible: bool, isEditable: bool, ): logger.debug( f"adding attribute, name:'{name}' attributeType:'{attributeType}' isList:'{isList}' isVisible:'{isVisible}' isEditable:'{isEditable}'" ) ds = DSLSchema(self._client.schema) if self._using_user_attributes: mutationSchema = ds.Mutation.addUserAttribute else: mutationSchema = ds.Mutation.addGroupAttribute query = dsl_gql( DSLMutation( mutationSchema.args( name=name, attributeType=attributeType, isList=isList, isVisible=isVisible, isEditable=isEditable, ).select(ds.Success.ok) ), ) self._client.execute(query) def get(self, name: str) -> AttributeSchema | None: attrs = self.list_all() return attrs.get(name) def update(self, name: str): raise Exception("unable to update attribute") def delete(self, name: str): logger.debug(f"deleting attribute '{name}'") ds = DSLSchema(self._client.schema) if self._using_user_attributes: mutationSchema = ds.Mutation.deleteUserAttribute else: mutationSchema = ds.Mutation.deleteGroupAttribute query = dsl_gql( DSLMutation( mutationSchema.args( name=name, ).select(ds.Success.ok) ), ) self._client.execute(query)