Skip to content

Shield Engine API

policyshield.shield.engine

ShieldEngine — synchronous orchestrator for PolicyShield.

ShieldEngine

Bases: BaseShieldEngine

Synchronous orchestrator that coordinates all PolicyShield components.

Handles pre-call checks (matching + PII), verdict building, session updates, and trace recording. Inherits all shared logic from :class:BaseShieldEngine.

Source code in policyshield/shield/engine.py
class ShieldEngine(BaseShieldEngine):
    """Synchronous orchestrator that coordinates all PolicyShield components.

    Handles pre-call checks (matching + PII), verdict building,
    session updates, and trace recording.  Inherits all shared logic
    from :class:`BaseShieldEngine`.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Issue #157: Persistent thread pool to avoid per-call creation overhead
        self._pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)

    def shutdown(self) -> None:
        """Shut down the persistent thread pool."""
        self._pool.shutdown(wait=False)

    def check(
        self,
        tool_name: str,
        args: dict | None = None,
        session_id: str = "default",
        sender: str | None = None,
        context: dict | None = None,
    ) -> ShieldResult:
        """Pre-call check: match rules, detect PII, build verdict.

        Args:
            tool_name: Name of the tool being called.
            args: Arguments to the tool call.
            session_id: Session identifier.
            sender: Identity of the caller.
            context: Optional context dict for context-based conditions.

        Returns:
            ShieldResult with the verdict and details.
        """
        if self._mode == ShieldMode.DISABLED:
            return self._verdict_builder.allow()

        start = time.monotonic()
        args = args or {}

        # OTel: start span
        span_ctx = None
        if self._otel:
            span_ctx = self._otel.on_check_start(tool_name, session_id, args)

        try:
            # Issue #33: Apply timeout to sync check (matches async path)
            if self._engine_timeout and self._engine_timeout > 0:
                future = self._pool.submit(
                    self._do_check_sync,
                    tool_name,
                    args,
                    session_id,
                    sender,
                    context,
                )
                result = future.result(timeout=self._engine_timeout)
            else:
                result = self._do_check_sync(tool_name, args, session_id, sender, context)
        except concurrent.futures.TimeoutError:
            # NOTE: Issue #214 — Pre-check hooks may have already executed,
            # but post-check hooks won't run. This is inherent to the timeout
            # design; fixing it would require 2-phase commit for plugins.
            if self._fail_open:
                logger.warning("Shield check timed out after %ss (fail-open)", self._engine_timeout)
                result = self._verdict_builder.allow()
            else:
                raise PolicyShieldError(f"Shield check timed out after {self._engine_timeout}s")
        except Exception as e:
            if self._fail_open:
                logger.warning("Shield error (fail-open): %s", e)
                result = self._verdict_builder.allow()
            else:
                raise PolicyShieldError(f"Shield check failed: {e}") from e

        latency_ms = (time.monotonic() - start) * 1000

        # OTel: end span
        if self._otel:
            self._otel.on_check_end(span_ctx, result, latency_ms)

        return self._apply_post_check(result, session_id, tool_name, latency_ms, args)

    def post_check(
        self,
        tool_name: str,
        result: Any,
        session_id: str = "default",
    ) -> PostCheckResult:
        """Post-call check on tool output (for PII in results).

        Args:
            tool_name: Name of the tool.
            result: The tool's return value.
            session_id: Session identifier.

        Returns:
            PostCheckResult with PII matches and optional redacted output.
        """
        # Issue #178: Trace post_check latency
        start = time.monotonic()
        pc_result = self._post_check_sync(tool_name, result, session_id)
        latency_ms = (time.monotonic() - start) * 1000
        if latency_ms > 100:
            logger.debug("post_check latency: %.1fms tool=%s", latency_ms, tool_name)
        return pc_result

check(tool_name, args=None, session_id='default', sender=None, context=None)

Pre-call check: match rules, detect PII, build verdict.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being called.

required
args dict | None

Arguments to the tool call.

None
session_id str

Session identifier.

'default'
sender str | None

Identity of the caller.

None
context dict | None

Optional context dict for context-based conditions.

None

Returns:

Type Description
ShieldResult

ShieldResult with the verdict and details.

Source code in policyshield/shield/engine.py
def check(
    self,
    tool_name: str,
    args: dict | None = None,
    session_id: str = "default",
    sender: str | None = None,
    context: dict | None = None,
) -> ShieldResult:
    """Pre-call check: match rules, detect PII, build verdict.

    Args:
        tool_name: Name of the tool being called.
        args: Arguments to the tool call.
        session_id: Session identifier.
        sender: Identity of the caller.
        context: Optional context dict for context-based conditions.

    Returns:
        ShieldResult with the verdict and details.
    """
    if self._mode == ShieldMode.DISABLED:
        return self._verdict_builder.allow()

    start = time.monotonic()
    args = args or {}

    # OTel: start span
    span_ctx = None
    if self._otel:
        span_ctx = self._otel.on_check_start(tool_name, session_id, args)

    try:
        # Issue #33: Apply timeout to sync check (matches async path)
        if self._engine_timeout and self._engine_timeout > 0:
            future = self._pool.submit(
                self._do_check_sync,
                tool_name,
                args,
                session_id,
                sender,
                context,
            )
            result = future.result(timeout=self._engine_timeout)
        else:
            result = self._do_check_sync(tool_name, args, session_id, sender, context)
    except concurrent.futures.TimeoutError:
        # NOTE: Issue #214 — Pre-check hooks may have already executed,
        # but post-check hooks won't run. This is inherent to the timeout
        # design; fixing it would require 2-phase commit for plugins.
        if self._fail_open:
            logger.warning("Shield check timed out after %ss (fail-open)", self._engine_timeout)
            result = self._verdict_builder.allow()
        else:
            raise PolicyShieldError(f"Shield check timed out after {self._engine_timeout}s")
    except Exception as e:
        if self._fail_open:
            logger.warning("Shield error (fail-open): %s", e)
            result = self._verdict_builder.allow()
        else:
            raise PolicyShieldError(f"Shield check failed: {e}") from e

    latency_ms = (time.monotonic() - start) * 1000

    # OTel: end span
    if self._otel:
        self._otel.on_check_end(span_ctx, result, latency_ms)

    return self._apply_post_check(result, session_id, tool_name, latency_ms, args)

post_check(tool_name, result, session_id='default')

Post-call check on tool output (for PII in results).

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
result Any

The tool's return value.

required
session_id str

Session identifier.

'default'

Returns:

Type Description
PostCheckResult

PostCheckResult with PII matches and optional redacted output.

Source code in policyshield/shield/engine.py
def post_check(
    self,
    tool_name: str,
    result: Any,
    session_id: str = "default",
) -> PostCheckResult:
    """Post-call check on tool output (for PII in results).

    Args:
        tool_name: Name of the tool.
        result: The tool's return value.
        session_id: Session identifier.

    Returns:
        PostCheckResult with PII matches and optional redacted output.
    """
    # Issue #178: Trace post_check latency
    start = time.monotonic()
    pc_result = self._post_check_sync(tool_name, result, session_id)
    latency_ms = (time.monotonic() - start) * 1000
    if latency_ms > 100:
        logger.debug("post_check latency: %.1fms tool=%s", latency_ms, tool_name)
    return pc_result

shutdown()

Shut down the persistent thread pool.

Source code in policyshield/shield/engine.py
def shutdown(self) -> None:
    """Shut down the persistent thread pool."""
    self._pool.shutdown(wait=False)

policyshield.shield.async_engine

AsyncShieldEngine — fully async orchestrator for PolicyShield.

Note: _do_check intentionally duplicates most of BaseShieldEngine._do_check_sync with await asyncio.to_thread() wrappers. Any logic change in the sync path must be mirrored here.

AsyncShieldEngine

Bases: BaseShieldEngine

Async orchestrator that coordinates all PolicyShield components.

Provides async/await versions of :class:ShieldEngine methods for integration with FastAPI, aiohttp, async LangChain agents, and CrewAI. CPU-bound work (matching, PII regex) is offloaded via asyncio.to_thread to avoid blocking the event loop.

Inherits all shared logic from :class:BaseShieldEngine.

Source code in policyshield/shield/async_engine.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class AsyncShieldEngine(BaseShieldEngine):
    """Async orchestrator that coordinates all PolicyShield components.

    Provides async/await versions of :class:`ShieldEngine` methods for
    integration with FastAPI, aiohttp, async LangChain agents, and CrewAI.
    CPU-bound work (matching, PII regex) is offloaded via
    ``asyncio.to_thread`` to avoid blocking the event loop.

    Inherits all shared logic from :class:`BaseShieldEngine`.
    """

    async def check(
        self,
        tool_name: str,
        args: dict | None = None,
        session_id: str = "default",
        sender: str | None = None,
        context: dict | None = None,
    ) -> ShieldResult:
        """Async pre-call check: match rules, detect PII, build verdict.

        Args:
            tool_name: Name of the tool being called.
            args: Arguments to the tool call.
            session_id: Session identifier.
            sender: Identity of the caller.
            context: Optional context dict for context-based conditions.

        Returns:
            ShieldResult with the verdict and details.
        """
        if self._mode == ShieldMode.DISABLED:
            return self._verdict_builder.allow()

        start = time.monotonic()
        args = args or {}

        # OTel: start span
        span_ctx = None
        if self._otel:
            span_ctx = self._otel.on_check_start(tool_name, session_id, args)

        try:
            result = await asyncio.wait_for(
                self._do_check(tool_name, args, session_id, sender, context),
                timeout=self._engine_timeout,
            )
        except asyncio.TimeoutError:
            logger.error(
                "Engine check timeout (%.1fs) for tool=%s",
                self._engine_timeout,
                tool_name,
            )
            verdict = Verdict.ALLOW if self._fail_open else Verdict.BLOCK
            result = ShieldResult(verdict=verdict, message="Check timed out")
        except Exception as e:
            if self._fail_open:
                logger.warning("Shield error (fail-open): %s", e)
                result = self._verdict_builder.allow()
            else:
                raise PolicyShieldError(f"Shield check failed: {e}") from e

        latency_ms = (time.monotonic() - start) * 1000

        # OTel: end span
        if self._otel:
            self._otel.on_check_end(span_ctx, result, latency_ms)

        # Issue #200: Add timeout to prevent deadlock if lock is contended
        try:
            return await asyncio.wait_for(
                asyncio.to_thread(self._apply_post_check, result, session_id, tool_name, latency_ms, args),
                timeout=self._engine_timeout,
            )
        except asyncio.TimeoutError:
            logger.error("_apply_post_check timed out (%.1fs)", self._engine_timeout)
            return result

    async def _do_check(
        self,
        tool_name: str,
        args: dict,
        session_id: str,
        sender: str | None,
        context: dict | None = None,
    ) -> ShieldResult:
        """Internal check logic — offloads CPU-bound work to threads."""
        # Issue #30: Invoke pre-check plugin hooks
        from policyshield.plugins import get_pre_check_hooks as _get_pre_hooks

        for hook_fn in _get_pre_hooks():
            try:
                # Issue #194: Run sync hooks in thread to avoid blocking event loop
                await asyncio.to_thread(hook_fn, tool_name=tool_name, args=args, session_id=session_id, sender=sender)
            except Exception as e:
                logger.warning("Pre-check hook error: %s", e)

        # Kill switch — immediate block
        if self._killed.is_set():
            return ShieldResult(
                verdict=Verdict.BLOCK,
                rule_id="__kill_switch__",
                message=self._kill_reason or "Kill switch is active",
            )

        # Snapshot honeypot checker to avoid race with _swap_rules()
        honeypot_checker = self._honeypot_checker

        # Honeypot check — always block, regardless of mode
        if honeypot_checker is not None:
            honeypot_match = honeypot_checker.check(tool_name)
            if honeypot_match:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__honeypot__",
                    message=honeypot_match.message,
                )

        # Sanitize args (save raw for plugin detectors — Issue #18)
        raw_args = args
        if self._sanitizer is not None:
            san_result = self._sanitizer.sanitize(args)
            if san_result.rejected:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__sanitizer__",
                    message=san_result.rejection_reason,
                )
            args = san_result.sanitized_args

        # Plugin detectors — use raw_args so sanitization doesn't hide threats
        from policyshield.plugins import DetectorResult as _DR
        from policyshield.plugins import get_detectors as _get_detectors

        for pname, detector_fn in _get_detectors().items():
            try:
                det_result = await asyncio.to_thread(detector_fn, tool_name=tool_name, args=raw_args)
                if isinstance(det_result, _DR) and det_result.detected:
                    logger.warning("Plugin detector '%s' triggered: %s", pname, det_result.message)
                    return ShieldResult(
                        verdict=Verdict.BLOCK,
                        rule_id=f"__plugin__{pname}",
                        message=det_result.message,
                    )
            except Exception as e:
                logger.warning("Plugin detector '%s' error: %s", pname, e)

        # Rate limit check (atomic check + record)
        if self._rate_limiter is not None:
            rl_result = self._rate_limiter.check_and_record(tool_name, session_id)
            if not rl_result.allowed:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__rate_limit__",
                    message=rl_result.message,
                )

        # Budget check — cost-based per-session/per-hour limits
        if self._budget_tracker is not None:
            budget_ok, budget_msg = self._budget_tracker.check_budget(session_id, tool_name)
            if not budget_ok:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__budget__",
                    message=budget_msg,
                )

        # PII taint chain — block outgoing calls if session is tainted
        if self._taint_enabled and tool_name in self._outgoing_tools:
            session = self._session_mgr.get_or_create(session_id)
            if session.pii_tainted:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__pii_taint__",
                    message=(f"Session tainted: {session.taint_details}. Outgoing calls blocked until reviewed."),
                )

        # Session state for condition matching
        session_state = self._build_session_state(session_id)

        with self._lock:
            matcher = self._matcher
            rule_set = self._rule_set
            pii_detector = self._pii  # Issue #109: snapshot PII detector
            event_buffer = self._session_mgr.get_event_buffer(session_id)  # Issue #207: inside lock

        # Offload CPU-bound matching to thread
        try:
            # Pass event buffer for chain rule evaluation
            match = await asyncio.to_thread(
                matcher.find_best_match,
                tool_name=tool_name,
                args=args,
                session_state=session_state,
                sender=sender,
                event_buffer=event_buffer,
                context=context,
            )
        except Exception as e:
            logger.error("Matcher error: %s", e)
            if self._fail_open:
                return self._verdict_builder.allow(args=args)
            return ShieldResult(
                verdict=Verdict.BLOCK,
                rule_id="__error__",
                message=f"Internal error: {e}",
            )

        if match is None:
            # LLM Guard — check for threats even when no rule matches
            if self._llm_guard is not None and getattr(self._llm_guard, "enabled", False):
                try:
                    guard_result = await self._llm_guard.analyze(tool_name, args)
                    if guard_result.is_threat and guard_result.risk_score >= self._llm_guard.risk_threshold:
                        return ShieldResult(
                            verdict=Verdict.BLOCK,
                            rule_id="__llm_guard__",
                            message=f"LLM Guard: {guard_result.explanation}",
                        )
                except Exception as e:
                    logger.warning("LLM Guard error: %s", e)

            default = rule_set.default_verdict
            if default == Verdict.BLOCK:
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id="__default__",
                    message="No matching rule. Default policy: BLOCK.",
                )
            return self._verdict_builder.allow(args=args)

        rule = match.rule

        # Build verdict based on rule (assign to result, don't early-return,
        # so shadow evaluation always runs — mirrors _do_check_sync)
        if rule.then == Verdict.BLOCK:
            # PII detection on args (only for BLOCK to enrich the message)
            pii_matches = []
            try:
                pii_matches = await asyncio.to_thread(pii_detector.scan_dict, args)
            except Exception as e:
                logger.warning("PII detection error (fail-open): %s", e)
            for pm in pii_matches:
                self._session_mgr.add_taint(session_id, pm.pii_type)

            result = self._verdict_builder.block(
                rule=rule,
                tool_name=tool_name,
                args=args,
                pii_matches=pii_matches,
            )
        elif rule.then == Verdict.REDACT:
            # redact_dict scans for PII internally — no need for a separate scan
            redacted, pii_matches = await asyncio.to_thread(pii_detector.redact_dict, args)
            for pm in pii_matches:
                self._session_mgr.add_taint(session_id, pm.pii_type)
            result = self._verdict_builder.redact(
                rule=rule,
                tool_name=tool_name,
                args=args,
                modified_args=redacted,
                pii_matches=pii_matches,
            )
        elif rule.then == Verdict.APPROVE:
            result = await self._handle_approval(rule, tool_name, args, session_id)
        else:
            result = self._verdict_builder.allow(rule=rule, args=args)

        # Shadow evaluation (non-blocking, log-only) — mirrors _do_check_sync
        with self._lock:
            shadow_matcher = self._shadow_matcher

        if shadow_matcher is not None:
            try:
                shadow_match = await asyncio.to_thread(
                    shadow_matcher.find_best_match,
                    tool_name=tool_name,
                    args=args,
                    session_state=session_state,
                    sender=sender,
                    event_buffer=event_buffer,
                    context=context,
                )
                shadow_verdict = shadow_match.rule.then if shadow_match else Verdict.ALLOW
                if shadow_match and shadow_match.rule.then != result.verdict:
                    logger.info(
                        "SHADOW: tool=%s verdict_diff: current=%s shadow=%s (rule=%s)",
                        tool_name,
                        result.verdict.value,
                        shadow_verdict.value,
                        shadow_match.rule.id,
                    )
                if self._tracer:
                    self._tracer.record(
                        session_id=session_id,
                        tool=tool_name,
                        verdict=shadow_verdict if shadow_match else Verdict.ALLOW,
                        rule_id=f"__shadow__{shadow_match.rule.id}" if shadow_match else "__shadow__",
                    )
            except Exception as e:
                logger.warning("Shadow evaluation error: %s", e)

        # Issue #30: Invoke post-check plugin hooks
        from policyshield.plugins import get_post_check_hooks as _get_post_hooks

        for hook_fn in _get_post_hooks():
            try:
                # Issue #194: Run sync hooks in thread to avoid blocking event loop
                await asyncio.to_thread(hook_fn, tool_name=tool_name, args=args, session_id=session_id, result=result)
            except Exception as e:
                logger.warning("Post-check hook error: %s", e)

        return result

    async def _handle_approval(
        self,
        rule: Any,
        tool_name: str,
        args: dict,
        session_id: str,
    ) -> ShieldResult:
        """Handle APPROVE verdict with async support.

        Returns immediately with the approval_id so the caller can poll
        /check-approval for status (non-blocking pattern).
        """
        if self._approval_backend is None:
            return ShieldResult(
                verdict=Verdict.BLOCK,
                rule_id=rule.id,
                message="No approval backend configured",
            )

        # Circuit breaker — fail fast if backend is unhealthy (Issue #3: parity with sync)
        if hasattr(self._approval_backend, "_circuit_breaker"):
            cb = self._approval_backend._circuit_breaker
            if not cb.is_available():
                verdict = Verdict.BLOCK if cb.fallback_verdict == "BLOCK" else Verdict.ALLOW
                return ShieldResult(
                    verdict=verdict,
                    rule_id=rule.id,
                    message=f"Approval backend circuit breaker OPEN (fallback: {cb.fallback_verdict})",
                )

        # Determine strategy
        strategy = None
        if rule.approval_strategy:
            try:
                strategy = ApprovalStrategy(rule.approval_strategy)
            except ValueError:
                pass

        # Check cache first
        if self._approval_cache is not None:
            cached = self._approval_cache.get(tool_name, rule.id, session_id, strategy=strategy)
            if cached is not None:
                if cached.approved:
                    return self._verdict_builder.allow(rule=rule, args=args)
                return ShieldResult(
                    verdict=Verdict.BLOCK,
                    rule_id=rule.id,
                    message="Approval denied (cached)",
                )

        req = ApprovalRequest.create(
            tool_name=tool_name,
            args=args,
            rule_id=rule.id,
            message=rule.message or "Approval required",
            session_id=session_id,
        )

        # Offload sync approval backend submit to thread
        await asyncio.to_thread(self._approval_backend.submit, req)

        # Store metadata for cache population after resolution
        with self._lock:
            self._approval_meta[req.request_id] = {
                "tool_name": tool_name,
                "rule_id": rule.id,
                "session_id": session_id,
                "strategy": strategy,
            }
            self._approval_meta_ts[req.request_id] = monotonic()
            self._cleanup_approval_meta()

        # Return APPROVE verdict with the approval_id for async polling
        return ShieldResult(
            verdict=Verdict.APPROVE,
            rule_id=rule.id,
            message=rule.message or "Approval required",
            approval_id=req.request_id,
        )

    async def post_check(
        self,
        tool_name: str,
        result: Any,
        session_id: str = "default",
    ) -> PostCheckResult:
        """Async post-call check on tool output (for PII in results).

        Args:
            tool_name: Name of the tool.
            result: The tool's return value.
            session_id: Session identifier.

        Returns:
            PostCheckResult with PII matches and optional redacted output.
        """
        return await asyncio.to_thread(self._post_check_sync, tool_name, result, session_id)

check(tool_name, args=None, session_id='default', sender=None, context=None) async

Async pre-call check: match rules, detect PII, build verdict.

Parameters:

Name Type Description Default
tool_name str

Name of the tool being called.

required
args dict | None

Arguments to the tool call.

None
session_id str

Session identifier.

'default'
sender str | None

Identity of the caller.

None
context dict | None

Optional context dict for context-based conditions.

None

Returns:

Type Description
ShieldResult

ShieldResult with the verdict and details.

Source code in policyshield/shield/async_engine.py
async def check(
    self,
    tool_name: str,
    args: dict | None = None,
    session_id: str = "default",
    sender: str | None = None,
    context: dict | None = None,
) -> ShieldResult:
    """Async pre-call check: match rules, detect PII, build verdict.

    Args:
        tool_name: Name of the tool being called.
        args: Arguments to the tool call.
        session_id: Session identifier.
        sender: Identity of the caller.
        context: Optional context dict for context-based conditions.

    Returns:
        ShieldResult with the verdict and details.
    """
    if self._mode == ShieldMode.DISABLED:
        return self._verdict_builder.allow()

    start = time.monotonic()
    args = args or {}

    # OTel: start span
    span_ctx = None
    if self._otel:
        span_ctx = self._otel.on_check_start(tool_name, session_id, args)

    try:
        result = await asyncio.wait_for(
            self._do_check(tool_name, args, session_id, sender, context),
            timeout=self._engine_timeout,
        )
    except asyncio.TimeoutError:
        logger.error(
            "Engine check timeout (%.1fs) for tool=%s",
            self._engine_timeout,
            tool_name,
        )
        verdict = Verdict.ALLOW if self._fail_open else Verdict.BLOCK
        result = ShieldResult(verdict=verdict, message="Check timed out")
    except Exception as e:
        if self._fail_open:
            logger.warning("Shield error (fail-open): %s", e)
            result = self._verdict_builder.allow()
        else:
            raise PolicyShieldError(f"Shield check failed: {e}") from e

    latency_ms = (time.monotonic() - start) * 1000

    # OTel: end span
    if self._otel:
        self._otel.on_check_end(span_ctx, result, latency_ms)

    # Issue #200: Add timeout to prevent deadlock if lock is contended
    try:
        return await asyncio.wait_for(
            asyncio.to_thread(self._apply_post_check, result, session_id, tool_name, latency_ms, args),
            timeout=self._engine_timeout,
        )
    except asyncio.TimeoutError:
        logger.error("_apply_post_check timed out (%.1fs)", self._engine_timeout)
        return result

post_check(tool_name, result, session_id='default') async

Async post-call check on tool output (for PII in results).

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
result Any

The tool's return value.

required
session_id str

Session identifier.

'default'

Returns:

Type Description
PostCheckResult

PostCheckResult with PII matches and optional redacted output.

Source code in policyshield/shield/async_engine.py
async def post_check(
    self,
    tool_name: str,
    result: Any,
    session_id: str = "default",
) -> PostCheckResult:
    """Async post-call check on tool output (for PII in results).

    Args:
        tool_name: Name of the tool.
        result: The tool's return value.
        session_id: Session identifier.

    Returns:
        PostCheckResult with PII matches and optional redacted output.
    """
    return await asyncio.to_thread(self._post_check_sync, tool_name, result, session_id)