Files
dify/api/tests/unit_tests/controllers/openapi/auth/test_composition.py
T
Xiyuan Chen cad0942f4d fix(api): enforce workspace membership + role checks in auth pipeline (#36931)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
2026-06-03 07:31:47 +00:00

135 lines
4.5 KiB
Python

import uuid
from controllers.openapi.auth.composition import account_pipeline, auth_router, external_sso_pipeline
from controllers.openapi.auth.data import RequestContext
from controllers.openapi.auth.flow import When
from controllers.openapi.auth.pipeline import AuthPipeline, PipelineRoute, PipelineRouter
from controllers.openapi.auth.verify import (
check_workspace_member,
check_workspace_mismatch,
check_workspace_role,
)
from libs.oauth_bearer import TokenType
from models.account import TenantAccountRole
def test_account_pipeline_is_auth_pipeline():
assert isinstance(account_pipeline, AuthPipeline)
def test_external_sso_pipeline_is_auth_pipeline():
assert isinstance(external_sso_pipeline, AuthPipeline)
def test_auth_router_is_pipeline_router():
assert isinstance(auth_router, PipelineRouter)
def test_account_pipeline_prepare_has_six_entries():
assert len(account_pipeline._prepare) == 6
def test_account_auth_list_has_seven_entries():
assert len(account_pipeline._auth) == 7
def test_external_sso_pipeline_prepare_has_four_entries():
assert len(external_sso_pipeline._prepare) == 4
def test_external_sso_auth_list_has_four_entries():
assert len(external_sso_pipeline._auth) == 4
def test_account_pipeline_has_unconditional_load_account():
non_when = [s for s in account_pipeline._prepare if not isinstance(s, When)]
assert len(non_when) == 1
def test_external_sso_pipeline_all_prepare_entries_are_when():
assert all(isinstance(s, When) for s in external_sso_pipeline._prepare)
def test_account_pipeline_has_one_unconditional_auth_step():
non_when = [s for s in account_pipeline._auth if not isinstance(s, When)]
assert len(non_when) == 1
def test_external_sso_pipeline_has_one_unconditional_auth_step():
non_when = [s for s in external_sso_pipeline._auth if not isinstance(s, When)]
assert len(non_when) == 1
def test_router_routes_contain_both_token_types():
assert TokenType.OAUTH_ACCOUNT in auth_router._routes
assert TokenType.OAUTH_EXTERNAL_SSO in auth_router._routes
def test_external_sso_route_has_ee_required_edition():
route = auth_router._routes[TokenType.OAUTH_EXTERNAL_SSO]
assert isinstance(route, PipelineRoute)
from controllers.openapi.auth.data import Edition
assert route.required_edition == frozenset({Edition.EE})
def test_account_route_has_no_required_edition():
route = auth_router._routes[TokenType.OAUTH_ACCOUNT]
assert isinstance(route, PipelineRoute)
assert route.required_edition is None
def _selected_auth_steps(*, app_id: bool, workspace_membership: bool, allowed_roles):
ctx = RequestContext(
token_type=TokenType.OAUTH_ACCOUNT,
scope=None,
path_params={"app_id": str(uuid.uuid4())} if app_id else {},
workspace_membership=workspace_membership,
allowed_roles=allowed_roles,
)
selected = []
for step in account_pipeline._auth:
if isinstance(step, When):
if step.applies(ctx, None):
selected.append(step._step)
else:
selected.append(step)
return selected
_ALL_ROLES = frozenset({TenantAccountRole.OWNER, TenantAccountRole.ADMIN, TenantAccountRole.NORMAL})
def test_workspace_path_selects_membership_check():
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=None)
assert check_workspace_member in steps
assert check_workspace_role not in steps
def test_app_path_selects_membership_check():
steps = _selected_auth_steps(app_id=True, workspace_membership=False, allowed_roles=None)
assert check_workspace_member in steps
assert check_workspace_role not in steps
def test_roles_set_selects_both_membership_and_role_check():
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=_ALL_ROLES)
assert check_workspace_member in steps
assert check_workspace_role in steps
def test_plain_path_selects_no_membership_or_role_step():
steps = _selected_auth_steps(app_id=False, workspace_membership=False, allowed_roles=None)
assert check_workspace_member not in steps
assert check_workspace_role not in steps
def test_app_path_selects_workspace_mismatch_check():
steps = _selected_auth_steps(app_id=True, workspace_membership=False, allowed_roles=None)
assert check_workspace_mismatch in steps
def test_workspace_path_skips_workspace_mismatch_check():
steps = _selected_auth_steps(app_id=False, workspace_membership=True, allowed_roles=None)
assert check_workspace_mismatch not in steps