172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
|
from typing import Any
|
||
|
|
||
|
import gql
|
||
|
from gql.dsl import DSLQuery, DSLSchema, dsl_gql, DSLMutation
|
||
|
from gql.transport.aiohttp import AIOHTTPTransport
|
||
|
|
||
|
from pprint import pprint
|
||
|
import re
|
||
|
|
||
|
import logging
|
||
|
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def to_camelcase(s):
|
||
|
s = re.sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*", "")
|
||
|
return "".join([s[0].lower(), s[1:]])
|
||
|
|
||
|
|
||
|
def to_snakecase(s):
|
||
|
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
|
||
|
|
||
|
|
||
|
class AttributeTypes:
|
||
|
STRING = "STRING"
|
||
|
INTEGER = "INTEGER"
|
||
|
JPEG_PHOTO = "JPEG_PHOTO"
|
||
|
DATE_TIME = "DATE_TIME"
|
||
|
|
||
|
|
||
|
class AttributeSchema:
|
||
|
def __init__(self, raw_vals: dict[str, Any]):
|
||
|
self.name: str = raw_vals["name"]
|
||
|
self.attributeType: str = raw_vals["attributeType"]
|
||
|
self.isList: bool = bool(raw_vals["isList"])
|
||
|
self.isVisible: bool = bool(raw_vals["isVisible"])
|
||
|
self.isEditable: bool = bool(raw_vals["isEditable"])
|
||
|
self.isReadonly: bool = bool(raw_vals["isReadonly"])
|
||
|
self.isHardcoded: bool = bool(raw_vals["isHardcoded"])
|
||
|
|
||
|
def __repr__(self):
|
||
|
return f"<AttributeSchema name={self.name} attributeType={self.attributeType} isList={self.isList} isVisible={self.isVisible} isEditable={self.isEditable} isReadonly={self.isReadonly} isHardcoded={self.isHardcoded} />"
|
||
|
|
||
|
|
||
|
class AttributeValue:
|
||
|
def __init__(self, raw_attribute: dict[str, Any]):
|
||
|
self.name: str = raw_attribute["name"]
|
||
|
|
||
|
tmpValue = raw_attribute.get("value", [])
|
||
|
if isinstance(tmpValue, str):
|
||
|
tmpValue = [tmpValue]
|
||
|
|
||
|
self.value: list[str] = tmpValue
|
||
|
|
||
|
def __repr__(self):
|
||
|
return f"<AttributeValue name={self.name} value={self.value} />"
|
||
|
|
||
|
|
||
|
class LLDAPAttributes:
|
||
|
def __init__(
|
||
|
self,
|
||
|
client: gql.Client,
|
||
|
using_user_attributes: bool = False,
|
||
|
using_group_attributes: bool = False,
|
||
|
):
|
||
|
self._client = client
|
||
|
|
||
|
self._using_user_attributes = using_user_attributes
|
||
|
self._using_group_attributes = using_group_attributes
|
||
|
|
||
|
if self._using_user_attributes and self._using_group_attributes:
|
||
|
raise Exception("can not both use user attributes and group attributes")
|
||
|
|
||
|
if not self._using_user_attributes and not self._using_group_attributes:
|
||
|
raise Exception("neither user attributes and group attributes specified")
|
||
|
|
||
|
def list_all(self) -> dict[str, AttributeSchema]:
|
||
|
ds = DSLSchema(self._client.schema)
|
||
|
|
||
|
if self._using_user_attributes:
|
||
|
querySchema = ds.Schema.userSchema
|
||
|
querySchemaStr = "userSchema"
|
||
|
else:
|
||
|
querySchema = ds.Schema.groupSchema
|
||
|
querySchemaStr = "groupSchema"
|
||
|
|
||
|
query = dsl_gql(
|
||
|
DSLQuery(
|
||
|
ds.Query.schema().select(
|
||
|
querySchema.select(
|
||
|
ds.AttributeList.attributes.select(
|
||
|
ds.AttributeSchema.name,
|
||
|
ds.AttributeSchema.attributeType,
|
||
|
ds.AttributeSchema.isList,
|
||
|
ds.AttributeSchema.isVisible,
|
||
|
ds.AttributeSchema.isEditable,
|
||
|
ds.AttributeSchema.isReadonly,
|
||
|
ds.AttributeSchema.isHardcoded,
|
||
|
)
|
||
|
)
|
||
|
),
|
||
|
),
|
||
|
)
|
||
|
|
||
|
result = self._client.execute(query)
|
||
|
|
||
|
attrs: dict[str, AttributeSchema] = {}
|
||
|
for raw_attribute in result["schema"][querySchemaStr]["attributes"]:
|
||
|
attr = AttributeSchema(raw_attribute)
|
||
|
attrs[attr.name] = attr
|
||
|
|
||
|
return attrs
|
||
|
|
||
|
def create(
|
||
|
self,
|
||
|
name: str,
|
||
|
attributeType: str,
|
||
|
isList: bool,
|
||
|
isVisible: bool,
|
||
|
isEditable: bool,
|
||
|
):
|
||
|
logger.debug(
|
||
|
f"adding attribute, name:'{name}' attributeType:'{attributeType}' isList:'{isList}' isVisible:'{isVisible}' isEditable:'{isEditable}'"
|
||
|
)
|
||
|
|
||
|
ds = DSLSchema(self._client.schema)
|
||
|
|
||
|
if self._using_user_attributes:
|
||
|
mutationSchema = ds.Mutation.addUserAttribute
|
||
|
else:
|
||
|
mutationSchema = ds.Mutation.addGroupAttribute
|
||
|
|
||
|
query = dsl_gql(
|
||
|
DSLMutation(
|
||
|
mutationSchema.args(
|
||
|
name=name,
|
||
|
attributeType=attributeType,
|
||
|
isList=isList,
|
||
|
isVisible=isVisible,
|
||
|
isEditable=isEditable,
|
||
|
).select(ds.Success.ok)
|
||
|
),
|
||
|
)
|
||
|
self._client.execute(query)
|
||
|
|
||
|
def get(self, name: str) -> AttributeSchema | None:
|
||
|
attrs = self.list_all()
|
||
|
return attrs.get(name)
|
||
|
|
||
|
def update(self, name: str):
|
||
|
raise Exception("unable to update attribute")
|
||
|
|
||
|
def delete(self, name: str):
|
||
|
logger.debug(f"deleting attribute '{name}'")
|
||
|
|
||
|
ds = DSLSchema(self._client.schema)
|
||
|
|
||
|
if self._using_user_attributes:
|
||
|
mutationSchema = ds.Mutation.deleteUserAttribute
|
||
|
else:
|
||
|
mutationSchema = ds.Mutation.deleteGroupAttribute
|
||
|
|
||
|
query = dsl_gql(
|
||
|
DSLMutation(
|
||
|
mutationSchema.args(
|
||
|
name=name,
|
||
|
).select(ds.Success.ok)
|
||
|
),
|
||
|
)
|
||
|
self._client.execute(query)
|