Skip to content

Permissions

sentinel_auth.permissions

Async HTTP client for checking permissions against the identity service.

PermissionClient(base_url, service_name, service_key=None)

Client for the identity service's permission API.

Source code in sdk/src/sentinel_auth/permissions.py
def __init__(self, base_url: str, service_name: str, service_key: str | None = None):
    self.base_url = base_url.rstrip("/")
    self.service_name = service_name
    self.service_key = service_key
    self._client = httpx.AsyncClient(base_url=self.base_url, timeout=5.0)

check(token, checks) async

Batch check permissions. Pass the user's JWT as the token.

Source code in sdk/src/sentinel_auth/permissions.py
async def check(
    self,
    token: str,
    checks: list[PermissionCheck],
) -> list[PermissionResult]:
    """Batch check permissions. Pass the user's JWT as the token."""
    response = await self._client.post(
        "/permissions/check",
        headers=self._headers(token),
        json={
            "checks": [
                {
                    "service_name": c.service_name,
                    "resource_type": c.resource_type,
                    "resource_id": str(c.resource_id),
                    "action": c.action,
                }
                for c in checks
            ]
        },
    )
    response.raise_for_status()
    data = response.json()
    return [
        PermissionResult(
            service_name=r["service_name"],
            resource_type=r["resource_type"],
            resource_id=uuid.UUID(r["resource_id"]),
            action=r["action"],
            allowed=r["allowed"],
        )
        for r in data["results"]
    ]

can(token, resource_type, resource_id, action) async

Convenience: check a single permission.

Source code in sdk/src/sentinel_auth/permissions.py
async def can(
    self,
    token: str,
    resource_type: str,
    resource_id: uuid.UUID,
    action: str,
) -> bool:
    """Convenience: check a single permission."""
    results = await self.check(
        token,
        [PermissionCheck(self.service_name, resource_type, resource_id, action)],
    )
    return results[0].allowed if results else False

register_resource(resource_type, resource_id, workspace_id, owner_id, visibility='workspace') async

Register a new resource (service-key only, no user JWT needed).

Source code in sdk/src/sentinel_auth/permissions.py
async def register_resource(
    self,
    resource_type: str,
    resource_id: uuid.UUID,
    workspace_id: uuid.UUID,
    owner_id: uuid.UUID,
    visibility: str = "workspace",
) -> dict:
    """Register a new resource (service-key only, no user JWT needed)."""
    response = await self._client.post(
        "/permissions/register",
        headers=self._headers(),
        json={
            "service_name": self.service_name,
            "resource_type": resource_type,
            "resource_id": str(resource_id),
            "workspace_id": str(workspace_id),
            "owner_id": str(owner_id),
            "visibility": visibility,
        },
    )
    response.raise_for_status()
    return response.json()

share(token, resource_type, resource_id, grantee_type, grantee_id, permission='view') async

Share a resource with a user or group.

Looks up the permission record by resource coordinates, then shares.

Source code in sdk/src/sentinel_auth/permissions.py
async def share(
    self,
    token: str,
    resource_type: str,
    resource_id: uuid.UUID,
    grantee_type: str,
    grantee_id: uuid.UUID,
    permission: str = "view",
) -> dict:
    """Share a resource with a user or group.

    Looks up the permission record by resource coordinates, then shares.
    """
    # Resolve resource coordinates → permission_id
    lookup = await self._client.get(
        f"/permissions/resource/{self.service_name}/{resource_type}/{resource_id}",
        headers=self._headers(),
    )
    lookup.raise_for_status()
    permission_id = lookup.json()["id"]

    # Share
    response = await self._client.post(
        f"/permissions/{permission_id}/share",
        headers=self._headers(token),
        json={
            "grantee_type": grantee_type,
            "grantee_id": str(grantee_id),
            "permission": permission,
        },
    )
    response.raise_for_status()
    return response.json()

accessible(token, resource_type, action, workspace_id, limit=None) async

Lookup accessible resource IDs for the current user.

Returns (resource_ids, has_full_access). When has_full_access is True and no limit was set, resource_ids is empty — the caller should skip filtering entirely.

Source code in sdk/src/sentinel_auth/permissions.py
async def accessible(
    self,
    token: str,
    resource_type: str,
    action: str,
    workspace_id: uuid.UUID,
    limit: int | None = None,
) -> tuple[list[uuid.UUID], bool]:
    """Lookup accessible resource IDs for the current user.

    Returns (resource_ids, has_full_access). When has_full_access is True
    and no limit was set, resource_ids is empty — the caller should skip
    filtering entirely.
    """
    payload: dict = {
        "service_name": self.service_name,
        "resource_type": resource_type,
        "action": action,
        "workspace_id": str(workspace_id),
    }
    if limit is not None:
        payload["limit"] = limit
    response = await self._client.post(
        "/permissions/accessible",
        headers=self._headers(token),
        json=payload,
    )
    response.raise_for_status()
    data = response.json()
    return (
        [uuid.UUID(rid) for rid in data["resource_ids"]],
        data["has_full_access"],
    )