diff --git a/.gitignore b/.gitignore index 2897177..103c358 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,8 @@ env/ # OS .DS_Store Thumbs.db +*\ 2.* +*\ 3.* # Testing .pytest_cache/ @@ -49,3 +51,4 @@ docker-compose.override.yml # Internal project context (generated, not source) context.md +IMPROVEMENTS*.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8f88e7b..78036c0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,7 +8,7 @@ Hey, glad you're here. Setup takes about 2 minutes. git clone https://github.com/cvemula1/NHInsight.git cd NHInsight make dev # installs all providers + dev tools -make test # 151 tests, <1 second +make test # 260 tests, <2 seconds make demo # see output without credentials ``` diff --git a/IMPROVEMENTS.md b/IMPROVEMENTS.md deleted file mode 100644 index a9e5fe2..0000000 --- a/IMPROVEMENTS.md +++ /dev/null @@ -1,430 +0,0 @@ -# NHInsight — Usability Improvement Plan - -*Practical analysis and code-level patches for developer adoption.* - ---- - -## 1. Executive Summary - -NHInsight is a solid v0.1 CLI. The core pipeline works: discover → classify → risk-score → attack-path → output. 151 tests pass. Five providers ship. The code is clean. - -The adoption bottleneck is **not functionality** — it's **first-impression UX**. A developer landing on the repo needs to go from zero to "oh that's useful" in under 30 seconds. Right now the README, CLI, and demo all add friction that makes the tool feel heavier than it is. - -**High-leverage changes (all small):** - -1. **README** — restructure as landing page, push docs lower -2. **CLI** — friendly no-args help, better no-provider error, post-demo suggestions -3. **PyPI** — better description, classifiers, drop setup.py -4. **Demo** — add "try next" footer after demo runs -5. **Output** — tighten severity labels, improve the HIGH icon -6. **Attack paths** — better `--attack-paths` help text, plain-English chain descriptions -7. **Patch plan** — 4 files, ~80 lines changed, shippable in one session - -None of these require new subsystems, dependencies, or architecture changes. - ---- - -## 2. Biggest Adoption Blockers - -Ranked by impact on a first-time GitHub visitor: - -| # | Blocker | Where | Fix Effort | -|---|---------|-------|------------| -| 1 | README puts Installation + Docker before Quick Start | README.md | Reorder | -| 2 | Running `nhinsight` with no args shows argparse default (ugly) | cli.py | 10 lines | -| 3 | Running `nhinsight scan` with no provider says "No providers specified" (no guidance) | cli.py | 5 lines | -| 4 | Demo ends silently — no "try this next" | cli.py | 8 lines | -| 5 | README "What It Finds" is 60 lines of tables before features | README.md | Collapse | -| 6 | HIGH severity uses same 🔴 icon as CRITICAL (confusing) | output.py | 1 line | -| 7 | `--attack-paths` help text is generic | cli.py | 1 line | -| 8 | PyPI description is too generic | pyproject.toml | 1 line | - -All fixable in a single PR. - ---- - -## 3. README Rewrite - -**See the actual README.md in this repo** — I will implement this directly. - -Structure: -1. Hero (title + one-liner + badges) -2. Quick Start (pip install + demo — 2 commands) -3. Scan examples (5 providers + multi) -4. Example output (compact, screenshot-friendly) -5. What It Finds (6 bullets, not tables) -6. Supported Providers (5 bullets) -7. Key Capabilities (6 bullets) -8. Install Options (4 pip lines + collapsible Docker) -9. Authentication (quick table + collapsible detail) -10. Attack Path Analysis (always visible — differentiator) -11. Risk Codes (collapsible) -12. Configuration (collapsible) -13. CLI Reference (collapsible) -14. Development (4 lines + collapsible) -15. Roadmap (5 one-liners) -16. Why NHInsight? (problem statement at bottom, not top) -17. Contributing / Related / License - -Key decisions: -- Quick Start is line 15, not line 107 -- "The Problem" becomes "Why NHInsight?" at the bottom (credibility, not first-screen) -- Risk code tables are collapsed — impressive when opened, not blocking when closed -- Auth detail is collapsed — quick table always visible -- Docker examples collapsed -- Architecture + Makefile collapsed -- Roadmap condensed to one-liners - ---- - -## 4. CLI UX Improvements - -### 4a. No-args behavior (`nhinsight` with nothing) - -**Current:** Shows argparse default help (functional but cold). - -**Improved:** Same help output but add a highlighted quick-start hint at the end. - -Change in `main()` at `cli.py:1132`: - -```python -else: - parser.print_help() - print(f"\n {BOLD}Quick start:{RESET}") - print(f" nhinsight demo # see sample data, no credentials") - print(f" nhinsight scan --aws # scan your AWS account") - print() -``` - -### 4b. No-provider error (`nhinsight scan` with no flags) - -**Current (line 203):** -``` -No providers specified. Use --aws, --azure, --gcp, --github, --k8s, or --all -``` - -**Improved:** -``` -No providers selected. - - Quick examples: - nhinsight scan --aws Scan AWS IAM - nhinsight scan --all --attack-paths Scan everything - nhinsight demo Try with sample data first - - Providers: --aws --azure --gcp --github --k8s --all -``` - -### 4c. Provider auth failure messages - -**Current (line 216):** -``` -AWS credentials not available. Configure AWS CLI or set AWS_PROFILE. -``` - -These are already good. Minor improvement — add the exact command: - -``` -AWS: credentials not found. Run 'aws configure' or set AWS_ACCESS_KEY_ID. -Azure: credentials not found. Run 'az login' or set AZURE_TENANT_ID + AZURE_CLIENT_ID. -GCP: credentials not found. Run 'gcloud auth application-default login' or set GOOGLE_APPLICATION_CREDENTIALS. -GitHub: token not found. Set GITHUB_TOKEN=ghp_... and use --github-org. -Kubernetes: cluster not reachable. Check ~/.kube/config or use --kube-context. -``` - -### 4d. Post-demo suggestions - -After `_print_demo_table()` completes, print: - -``` - Try it on your infrastructure: - nhinsight scan --aws Scan AWS IAM - nhinsight scan --all Scan all available providers - nhinsight scan --aws --explain Add AI-powered explanations -``` - -### 4e. `--attack-paths` help text - -**Current:** -``` -Run identity attack path analysis -``` - -**Improved:** -``` -Trace privilege chains across providers (e.g. K8s SA → IRSA → AWS admin) -``` - ---- - -## 5. Packaging / PyPI Improvements - -### 5a. pyproject.toml changes - -**Description** (line 8): -``` -Current: "Non-Human Identity discovery for cloud infrastructure" -Better: "Discover risky non-human identities and privilege paths across AWS, Azure, GCP, GitHub, and Kubernetes" -``` - -**Add classifiers:** -```toml -"Environment :: Console", -"Operating System :: OS Independent", -"Typing :: Typed", -``` - -**Add `Documentation` URL:** -```toml -Documentation = "https://github.com/cvemula1/NHInsight#quick-start" -Changelog = "https://github.com/cvemula1/NHInsight/releases" -``` - -### 5b. setup.py - -The current `setup.py` is a 3-line shim. It's only needed for `pip install -e .` on older pip. **Keep it** — it's harmless and avoids edge-case breakage. No change needed. - -### 5c. README as long_description - -Already set via `readme = "README.md"` in pyproject.toml. The rewritten README with collapsible `
` sections will render well on PyPI (PyPI supports `
` in markdown since 2023). No change needed. - -### 5d. Release checklist for PyPI - -``` -1. Bump version in nhinsight/__init__.py + pyproject.toml -2. Update CHANGELOG.md (if exists) -3. git tag v0.1.x -4. git push origin v0.1.x (triggers release.yml) -5. Verify PyPI page renders correctly -6. Verify Docker Hub image tagged -7. Create GitHub Release with notes -``` - ---- - -## 6. Demo Improvements - -### 6a. Post-demo footer - -Add after the combined summary in `_print_demo_table()` (after line 1092): - -```python -# Post-demo suggestions -print(f"\n {BOLD}Try it on your infrastructure:{RESET}") -print(f" nhinsight scan --aws Scan AWS IAM") -print(f" nhinsight scan --all Scan all providers") -print(f" nhinsight scan --aws --explain AI-powered explanations") -print(f" nhinsight scan --all -f sarif SARIF for GitHub Security tab") -print() -``` - -### 6b. Demo output is already good - -The demo data covers all 5 providers with realistic findings. The combined summary with urgent fixes is solid. The scorecard and NIST compliance sections are impressive for screenshots. - -**One minor tweak:** The demo header could include a timing line to show speed: - -After line 1016, add: -```python -print(f" {DIM}Scanned 5 providers in 0.3s{RESET}\n") -``` - -This is cosmetic but reinforces "fast tool" positioning. - -### 6c. Demo attack paths - -The demo currently does NOT run attack paths. To make the demo show attack path analysis (which is the differentiator), add `--attack-paths` support to the demo command: - -In `_build_parser()`, add to demo_p: -```python -demo_p.add_argument("--attack-paths", action="store_true", - help="Include attack path analysis in demo") -``` - -In `main()` demo handler, after printing: -```python -if getattr(args, "attack_paths", False): - from nhinsight.analyzers.attack_paths import analyze_attack_paths - from nhinsight.core.output import print_attack_paths - ap_result = analyze_attack_paths(result.identities) - print_attack_paths(ap_result) -``` - ---- - -## 7. Output Clarity Improvements - -### 7a. HIGH severity icon - -**Current:** Both CRITICAL and HIGH use 🔴. This makes them visually identical. - -**Fix in output.py line 31:** -```python -Severity.HIGH: "🟠", -``` - -This matches the README example output and is the standard convention. - -### 7b. Severity label formatting - -**Current (line 43):** -```python -out.write(f" {color}{icon} {label} ({len(identities)}){RESET}\n") -``` - -This prints `🔴 CRITICAL (3)` which is clear. No change needed. - -### 7c. Identity type display - -**Current (line 47):** -```python -out.write(f" {DIM}({ident.identity_type.value}, {ident.provider.value}){RESET}\n") -``` - -This shows `(iam_user, aws)` — uses enum values with underscores. Could be prettier but it's consistent with JSON/SARIF output. **Leave as-is** for now. Changing display names is a v0.2 polish. - -### 7d. Summary line - -**Current (line 109):** -```python -out.write(f" Summary: {len(nhis)} NHIs") -``` - -Good. No change. - -### 7e. Attack path output - -**Current (line 400):** -```python -blast_str = f" blast: {path.blast_radius:.0f}/100" -``` - -**Improved wording:** -```python -blast_str = f" risk: {path.blast_radius:.0f}/100" -``` - -"risk" is more intuitive than "blast" for most users. The blast_radius internal name can stay. - ---- - -## 8. Attack Path Wording Improvements - -### 8a. Better `--attack-paths` help text - -**Current (line 111):** -```python -help="Run identity attack path analysis" -``` - -**Improved:** -```python -help="Trace privilege escalation chains across providers (e.g. K8s SA → cloud admin)" -``` - -### 8b. README attack path section - -The current section is good. One improvement — add 3 concrete example chains: - -```markdown -Example chains NHInsight detects: -- **K8s → AWS**: ServiceAccount → IRSA role → IAM role with AdministratorAccess -- **K8s → GCP**: ServiceAccount → Workload Identity → SA with roles/owner -- **GitHub → AWS**: Deploy key → workflow → OIDC → IAM role with S3FullAccess -``` - -### 8c. Attack path output header - -**Current (line 371):** -```python -out.write(f" {BOLD}Identity Attack Path Analysis{RESET}\n") -``` - -**Improved:** -```python -out.write(f" {BOLD}Privilege Escalation Paths{RESET}\n") -``` - -"Privilege Escalation Paths" is more immediately understood than "Identity Attack Path Analysis." - -### 8d. Path display label - -**Current (line 398):** -```python -out.write(f" {color}{icon} {path.id}{RESET}") -``` - -Shows `AP-001` which is opaque. Add the description: - -```python -out.write(f" {color}{icon} {path.id} — {path.description}{RESET}") -``` - -The `description` field already contains `entry → target (cross-system: k8s → aws)`. - ---- - -## 9. Minimal Patch Plan - -**4 files. ~80 lines changed. One PR.** - -### Priority 1 — Highest UX impact (do first) - -| File | Change | Lines | -|------|--------|-------| -| `README.md` | Restructure: Quick Start first, collapsible details | Full rewrite | -| `cli.py:1092` | Add post-demo "try next" suggestions | +8 lines | -| `cli.py:202-204` | Better no-provider error with examples | +8 lines | -| `cli.py:1132` | Friendly no-args hint | +4 lines | - -### Priority 2 — Polish (do second) - -| File | Change | Lines | -|------|--------|-------| -| `output.py:31` | HIGH icon: 🔴 → 🟠 | 1 line | -| `output.py:371` | Header: "Privilege Escalation Paths" | 1 line | -| `output.py:400` | "blast" → "risk" label | 1 line | -| `output.py:398` | Show path description alongside ID | 1 line | -| `cli.py:111` | Better `--attack-paths` help text | 1 line | -| `pyproject.toml:8` | Better PyPI description | 1 line | -| `pyproject.toml:14-27` | Add classifiers + URLs | +5 lines | - -### Priority 3 — Nice-to-have (do if time) - -| File | Change | Lines | -|------|--------|-------| -| `cli.py:126` | Add `--attack-paths` flag to demo command | +3 lines | -| `cli.py:1113` | Handle demo `--attack-paths` | +5 lines | -| `cli.py:216,227,240,252,264` | Improve auth error messages | 5 lines | - -### What can wait until v0.2 - -- Identity type display names (e.g. `iam_user` → `IAM User`) -- Provider badges in terminal output -- Interactive mode / TUI -- Separate docs/ folder with detailed guides -- Progress spinner during scans -- `nhinsight init` command for first-time setup - ---- - -## 10. Optional Later Improvements - -These are good ideas that don't belong in this patch: - -1. **`nhinsight init`** — interactive first-run wizard that checks which providers are configured -2. **`nhinsight explain AP-001`** — explain a specific attack path in plain English using LLM -3. **Progress indicator** — show a spinner or progress bar during long scans -4. **`--quiet` flag** — suppress everything except summary line (for CI/CD) -5. **`--fail-on critical`** — exit non-zero if critical findings exist (for CI gates) -6. **GitHub Actions template** — `.github/workflows/nhinsight.yml` users can copy -7. **Separate docs site** — move auth/config/risk-codes to a mkdocs or docusaurus site -8. **Terminal width detection** — adapt output formatting to terminal width -9. **Color detection** — disable ANSI when piping to file (currently always colored) -10. **Completion scripts** — bash/zsh/fish completions for CLI flags - -None of these are urgent. The 9-point patch plan above is the right next step. - ---- - -*Generated for NHInsight v0.1.0 — practical patches, no platform thinking.* diff --git a/README.md b/README.md index 3a68a94..a76b65e 100644 --- a/README.md +++ b/README.md @@ -12,43 +12,89 @@ +## Why NHInsight? + +Non-human identities outnumber humans [**80:1**](https://www.cyberark.com/press/machine-identities-outnumber-humans-by-more-than-80-to-1-new-report-exposes-the-exponential-threats-of-fragmented-identity-security/) in most orgs — and growing 44% year-over-year. Enterprise NHI tools charge **$50K+/year**. NHInsight does it for free — open source, runs locally, no telemetry. + ## Quick Start ```bash -pip install nhinsight -nhinsight demo +pip install nhinsight # install from PyPI +nhinsight demo # see it in action (no credentials needed) ``` -Scan a real environment: +> **Try it in 30 seconds** — `nhinsight demo` runs with built-in sample data so you can see findings, attack paths, and risk scores instantly. + +### Scan a real environment ```bash +# Single provider nhinsight scan --aws + +# Multi-provider with attack path analysis nhinsight scan --all --attack-paths -``` -Or use Docker: +# CI/CD workflow scanning (no cloud creds required) +nhinsight scan --github-workflows .github/workflows --attack-paths -```bash +# Docker (zero install) docker run --rm chvemula/nhinsight demo ``` -## Example Output +### Run in Your CI/CD Pipeline + +Add NHInsight to any GitHub Actions workflow — **no cloud credentials needed** for workflow scanning: + +```yaml +# .github/workflows/nhi-scan.yml +name: NHI Security Scan +on: [push, pull_request] +jobs: + nhi-scan: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: cvemula1/NHInsight@main + with: + attack-paths: "true" # enable attack path analysis + fail-on: "high" # block PRs with high+ severity findings ``` - 🔴 CRITICAL — deploy-bot (iam_user, aws) - │ Has AdministratorAccess policy attached - 🔴 CRITICAL — terraform-deployer (gcp_service_account, gcp) - │ Service account has roles/owner +The action scans your `.github/workflows` directory, writes findings to the **PR summary**, and fails the check if any identity risk meets the severity threshold. Add cloud provider credentials to also scan live infrastructure: + +```yaml + - uses: cvemula1/NHInsight@main + with: + providers: "--aws --azure" + attack-paths: "true" + fail-on: "critical" + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} + AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }} + AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} +``` - 🔴 CRITICAL — aks-cluster-sp (azure_sp, azure) - │ SP has Contributor at subscription scope +### Example: Identity Risk Findings - � HIGH — terraform-deployer/key:abc123de (gcp_sa_key, gcp) - │ SA key is 400 days old (max 365) +| Severity | Identity | Provider | Risk | +|:--------:|----------|:--------:|------| +| 🔴 **CRITICAL** | `deploy-bot` | AWS | AdministratorAccess policy attached | +| 🔴 **CRITICAL** | `terraform-deployer` | GCP | Service account has `roles/owner` | +| 🟠 **HIGH** | `aks-cluster-sp` | Azure | SP has Contributor at subscription scope | +| 🟡 **MEDIUM** | `ci-runner-mi` | Azure | Self-hosted runner MI accesses Key Vault + AKS + ACR | +| 🟡 **MEDIUM** | `deploy-sa` | K8s | Bound to `cluster-admin` ClusterRole | - Summary: 25+ risky non-human identities across 5 providers -``` +### Example: Attack Paths Detected + +| Blast | Path | Risk | +|:-----:|------|------| +| **86** | `GitHub Actions` → `Managed Identity` → `Key Vault` → `AKS` → `K8s Secrets` | PR trigger can reach production secrets via MI | +| **83** | `GitHub Actions` → `Managed Identity` → `Terraform Apply` | Self-hosted runner MI can modify infrastructure | +| **80** | `GitHub Actions` → `OIDC` → `AWS IAM Role` → `S3` + `Secrets Manager` | Workflow assumes admin role via OIDC federation | +| **75** | `K8s ServiceAccount` → `IRSA` → `IAM Role (AdministratorAccess)` | Pod escape leads to full AWS account access | ## What It Finds @@ -57,9 +103,11 @@ docker run --rm chvemula/nhinsight demo - Wildcard trust relationships and open role assumptions - Dangerous Kubernetes service account bindings (cluster-admin, legacy tokens) - Risky GitHub deploy keys, app permissions, and admin-scoped tokens +- **GitHub Actions CI/CD risks** — OIDC misconfigurations, Managed Identity abuse, self-hosted runner exposure +- **Cloud resource access from workflows** — Key Vault, ACR, AKS, Storage, SQL, Terraform, Helm, and 40+ resource patterns - Cross-cloud attack paths from entry points to privileged resources -**34 risk checks** across 5 providers. [See all risk codes](#risk-codes). +**42 risk checks** across 5 providers + CI/CD workflows. [See all risk codes](#risk-codes). ## Supported Providers @@ -247,6 +295,7 @@ NHInsight builds an identity graph and traces paths from entry points (keys, tok ```bash nhinsight scan --aws --k8s --gcp --attack-paths +nhinsight scan --github-workflows .github/workflows --attack-paths ``` Example chains NHInsight detects: @@ -254,6 +303,109 @@ Example chains NHInsight detects: - **K8s → AWS** — ServiceAccount → IRSA role → IAM role with AdministratorAccess - **K8s → GCP** — ServiceAccount → Workload Identity → SA with roles/owner - **GitHub → AWS** — Deploy key → workflow → OIDC → IAM role with S3FullAccess +- **GitHub Actions → Azure** — Managed Identity → Key Vault secrets, AKS cluster, ACR registry +- **GitHub Actions → IaC** — Self-hosted runner MI → Terraform apply (infrastructure control) +- **GitHub Actions → K8s** — MI → AKS credentials → kubectl exec, Helm deployments + +```mermaid +flowchart LR + subgraph GitHub Actions + wf1["deploy.yml
(self-hosted runner)"] + wf2["ci.yml
(OIDC)"] + end + + subgraph Cloud Identity + mi["Managed Identity"] + oidc_aws["OIDC → AWS Role"] + oidc_az["OIDC → Azure SP"] + end + + subgraph Azure Resources + kv{{"Key Vault
secrets"}} + aks{{"AKS Cluster"}} + acr["ACR Registry"] + sql[("Azure SQL")] + storage[("Storage Account")] + end + + subgraph AWS Resources + s3[("S3 Bucket")] + secrets{{"Secrets Manager"}} + eks{{"EKS Cluster"}} + iam{{"IAM Roles"}} + end + + subgraph Kubernetes + helm["Helm Deploy"] + k8s_secret{{"K8s Secrets"}} + kubectl["kubectl exec"] + end + + subgraph IaC + tf{{"Terraform Apply"}} + end + + wf1 -->|"az login --identity"| mi + wf2 -->|"OIDC token"| oidc_aws + wf2 -->|"OIDC token"| oidc_az + + mi -->|"secret access"| kv + mi -->|"get-credentials"| aks + mi -->|"acr login"| acr + mi -->|"query"| sql + mi -->|"blob upload"| storage + + oidc_aws -->|"assumes role"| iam + iam -->|"s3 cp"| s3 + iam -->|"get-secret"| secrets + iam -->|"eks get-token"| eks + + aks -->|"helm upgrade"| helm + aks -->|"create secret"| k8s_secret + aks -->|"exec"| kubectl + + mi -->|"tf apply"| tf + + style wf1 fill:#24292e,stroke:#444,color:#fff + style wf2 fill:#24292e,stroke:#444,color:#fff + style mi fill:#0078d4,stroke:#005a9e,color:#fff + style oidc_aws fill:#FF9900,stroke:#232F3E,color:#232F3E + style oidc_az fill:#0078d4,stroke:#005a9e,color:#fff + style kv fill:#c00,stroke:#900,color:#fff + style aks fill:#326CE5,stroke:#1a3a6e,color:#fff + style acr fill:#0078d4,stroke:#005a9e,color:#fff + style sql fill:#0078d4,stroke:#005a9e,color:#fff + style storage fill:#0078d4,stroke:#005a9e,color:#fff + style s3 fill:#FF9900,stroke:#232F3E,color:#232F3E + style secrets fill:#c00,stroke:#900,color:#fff + style eks fill:#FF9900,stroke:#232F3E,color:#232F3E + style iam fill:#FF9900,stroke:#232F3E,color:#232F3E + style helm fill:#0f1689,stroke:#0a0f5c,color:#fff + style k8s_secret fill:#c00,stroke:#900,color:#fff + style kubectl fill:#326CE5,stroke:#1a3a6e,color:#fff + style tf fill:#7b42bc,stroke:#5c2d91,color:#fff +``` + +### GitHub Actions Workflow Scanning + +Scan CI/CD workflows for identity and resource access attack paths: + +```bash +nhinsight scan --github-workflows path/to/.github/workflows --attack-paths +``` + +Detects **40+ resource access patterns** across: + +| Category | Resources Detected | +|----------|-------------------| +| **Azure** | Key Vault, ACR, AKS, Storage, SQL, CosmosDB, DNS, AD, IAM, Functions, Web Apps | +| **AWS** | S3, Secrets Manager, IAM, EC2, Lambda, ECR, EKS, RDS, DynamoDB, CloudFormation | +| **GCP** | Compute, GKE, Secret Manager, Cloud SQL, IAM, Cloud Storage | +| **Kubernetes** | kubectl apply/exec, secret creation, resource mutation | +| **Deployments** | Helm, Docker push, Terraform/Pulumi apply, Ansible | +| **External** | Cloudflare DNS/CDN | + +Also detects: OIDC permission misconfigurations, PR-trigger cloud auth risks, self-hosted runner Managed Identity exposure, and composite action inlining. Each path includes: - **Blast radius scoring** — 0–100 composite based on privilege level and cross-system reach @@ -294,7 +446,7 @@ flowchart LR ## Risk Codes
-All 34 risk codes by provider +All 42 risk codes by provider ### AWS @@ -302,7 +454,7 @@ flowchart LR |------|------|----------| | Admin/PowerUser policy attached | `AWS_ADMIN_ACCESS` | Critical | | Role trust allows any principal (`*`) | `AWS_WILDCARD_TRUST` | Critical | -| Access key never rotated (>365 days) | `AWS_KEY_NOT_ROTATED` | High | +| Access key not rotated (>365 days) | `AWS_KEY_NOT_ROTATED` | High | | Console access without MFA | `AWS_NO_MFA` | High | | Inactive key not deleted | `AWS_KEY_INACTIVE` | Medium | @@ -310,11 +462,13 @@ flowchart LR | Risk | Code | Severity | |------|------|----------| -| SP/MI with Owner/Contributor at subscription scope | `AZURE_SP_DANGEROUS_ROLE` | Critical | +| SP with Owner/Contributor at subscription scope | `AZURE_SP_DANGEROUS_ROLE` | Critical | +| SP with elevated role at resource group scope | `AZURE_SP_ELEVATED_ROLE` | Medium | +| Managed Identity with dangerous role at subscription scope | `AZURE_MI_DANGEROUS_ROLE` | High | | Disabled SP still has RBAC bindings | `AZURE_SP_DISABLED_WITH_ROLES` | Medium | | App credential expired | `AZURE_CRED_EXPIRED` | High | | App credential expiring within 30 days | `AZURE_CRED_EXPIRING_SOON` | Medium | -| Secret not rotated (>365 days) | `AZURE_SECRET_NOT_ROTATED` | High | +| Client secret not rotated (>365 days) | `AZURE_SECRET_NOT_ROTATED` | High | ### GCP @@ -327,6 +481,7 @@ flowchart LR | SA key not rotated (>365 days) | `GCP_KEY_NOT_ROTATED` | High | | SA key expired | `GCP_KEY_EXPIRED` | High | | SA key expiring within 30 days | `GCP_KEY_EXPIRING_SOON` | Medium | +| SA key disabled but not deleted | `GCP_KEY_DISABLED` | Low | ### Kubernetes @@ -335,15 +490,34 @@ flowchart LR | SA bound to cluster-admin | `K8S_CLUSTER_ADMIN` | Critical | | Legacy long-lived SA token secret | `K8S_LEGACY_SA_TOKEN` | High | | Automount token on privileged SA | `K8S_AUTOMOUNT_PRIVILEGED` | High | -| Default SA in use / Orphaned SA / No WI | `K8S_*` | Medium | +| Using default SA in default namespace | `K8S_DEFAULT_SA` | Medium | +| Orphaned SA (no running pods) | `K8S_ORPHANED_SA` | Medium | +| SA has secrets but no IRSA/Workload Identity | `K8S_NO_WORKLOAD_IDENTITY` | Medium | +| Deployments using default SA | `K8S_DEPLOY_DEFAULT_SA` | Medium | +| Opaque secret contains credential-like keys | `K8S_SECRET_CREDENTIALS` | Medium | +| TLS secret not managed by cert-manager | `K8S_TLS_UNMANAGED` | Low | ### GitHub | Risk | Code | Severity | |------|------|----------| | Token with admin scope | `GH_ADMIN_SCOPE` | High | +| Token has full repo access | `GH_REPO_WRITE` | Medium | | App with dangerous write perms | `GH_APP_DANGEROUS_PERMS` | High | | Deploy key with write access | `GH_DEPLOY_KEY_WRITE` | Medium | +| Inactive webhook | `GH_WEBHOOK_INACTIVE` | Low | + +### GitHub Actions / CI/CD + +| Risk | Code | Severity | +|------|------|----------| +| OIDC workflow assumes admin-like role | `GH_OIDC_ADMIN_ROLE` | High | +| Cloud auth triggered on pull_request | `GH_OIDC_PR_TRIGGER` | High | +| Self-hosted runner uses Managed Identity | `GH_WF_SELF_HOSTED_MI` | High | +| Workflow missing id-token: write permission | `GH_OIDC_NO_PERMISSION` | Medium | +| Workflow reads Key Vault secrets | `GH_WF_KEYVAULT_SECRETS` | Medium | +| Workflow fetches AKS credentials | `GH_WF_AKS_ACCESS` | Medium | +| OIDC role ARN is a dynamic reference | `GH_OIDC_DYNAMIC_ROLE` | Info | ### Universal @@ -394,23 +568,31 @@ nhinsight scan [OPTIONS] Discover and analyze NHIs --github Scan GitHub org --k8s Scan Kubernetes cluster --all Scan all available providers - --attack-paths Run identity attack path analysis - --format {table,json,sarif} Output format (default: table) - --explain Add AI-powered explanations + --github-workflows [PATH] Scan GitHub Actions workflows (default: .github/workflows) + --attack-paths Trace privilege escalation chains across providers + --mermaid Output attack paths as Mermaid diagrams + --ci-summary Compact markdown summary for CI/PR usage + --fail-on {critical,high,medium,low} Exit code 1 if severity threshold met (CI gating) + --format, -f {table,json,sarif} Output format (default: table) + --output, -o FILE Write output to file + --explain Add AI-powered explanations (requires OPENAI_API_KEY) + --ascii ASCII-safe output (no emoji; auto in CI) + --stale-days N Days without use before flagging (default: 90) + --verbose, -v Verbose logging --aws-profile PROFILE AWS named profile --aws-region REGION AWS region --azure-tenant-id ID Azure tenant ID --azure-subscription-id ID Azure subscription ID --gcp-project PROJECT GCP project ID --github-org ORG GitHub organization + --github-base-url URL GitHub Enterprise base URL --kubeconfig PATH Path to kubeconfig --kube-context CTX Kubernetes context --kube-namespace NS Namespace (default: all) - --stale-days N Days without use before flagging (default: 90) - --output FILE Write output to file - --verbose Verbose logging nhinsight demo Show demo scan with sample data +nhinsight report --demo Generate formatted markdown report +nhinsight graph --input FILE Render Mermaid diagrams from saved JSON nhinsight version Show version ``` @@ -422,7 +604,7 @@ nhinsight version Show version git clone https://github.com/cvemula1/NHInsight.git cd NHInsight pip install -e ".[all,dev]" -make test # 151 tests, <1 second +make test # 260 tests, <2 seconds ```
@@ -452,7 +634,9 @@ nhinsight/ ├── core/ │ ├── models.py # Identity, RiskFlag, ScanResult, enums │ ├── config.py # NHInsightConfig (env vars + CLI flags) -│ └── output.py # Table, JSON, SARIF formatters +│ ├── output.py # Table, JSON, SARIF formatters +│ ├── mermaid.py # Mermaid diagram renderer for attack paths +│ └── ci_summary.py # Compact CI/PR markdown summary ├── providers/ │ ├── base.py # Abstract BaseProvider interface │ ├── aws.py # AWS IAM discovery (boto3) @@ -462,10 +646,11 @@ nhinsight/ │ └── kubernetes.py # Kubernetes discovery (kubernetes client) ├── analyzers/ │ ├── classification.py # Human vs machine classification -│ ├── risk.py # Risk analysis (34 checks) +│ ├── risk.py # Risk analysis (42 checks) │ ├── scoring.py # NIST SP 800-53 + IGA governance scoring │ ├── graph.py # Identity graph model (nodes, edges, BFS) -│ └── attack_paths.py # Attack path detection + blast radius +│ ├── attack_paths.py # Attack path detection + blast radius +│ └── workflow_scanner.py # GitHub Actions CI/CD scanner (40+ resource patterns) └── explain/ └── llm.py # Optional LLM explanations (OpenAI) ``` @@ -474,16 +659,13 @@ nhinsight/ ## Roadmap -- [x] **v0.1** — 5 providers, 34 risk checks, attack paths, NIST scoring, SARIF, AI explanations, Docker +- [x] **v0.1** — 5 providers, 42 risk checks, attack paths, NIST scoring, SARIF, AI explanations, Docker +- [x] **v0.1.1** — GitHub Actions workflow scanner, 40+ resource access patterns, MI/OIDC attack paths, Mermaid diagrams, CI gating (`--fail-on`), GitHub Action - [ ] **v0.2** — OPA/Rego policies, ML classification, anomaly detection, IAM right-sizing - [ ] **v0.3** — Slack, Teams, Jira, PagerDuty, webhook integrations - [ ] **v0.4** — SIEM export, scheduled scans, drift detection, dashboard API - [ ] **v0.5** — Auto-remediation, least-privilege generation, AI agent, PR-based fixes -## Why NHInsight? - -Non-human identities outnumber humans **45:1** in most orgs. Enterprise NHI tools charge **$50K+/year**. NHInsight does it for free — open source, runs locally, no telemetry. - ## Contributing See [CONTRIBUTING.md](CONTRIBUTING.md) for development guidelines. diff --git a/action.yml b/action.yml new file mode 100644 index 0000000..918bcfe --- /dev/null +++ b/action.yml @@ -0,0 +1,91 @@ +name: "NHInsight NHI Scanner" +description: "Scan non-human identities and GitHub Actions OIDC connections for privilege escalation risks" +author: "cvemula1" + +branding: + icon: "shield" + color: "red" + +inputs: + version: + description: "NHInsight version to install (default: latest)" + required: false + default: "" + providers: + description: "Provider flags to scan (e.g. '--aws --azure'). Leave empty for workflow-only scan." + required: false + default: "" + github-workflows: + description: "Path to GitHub Actions workflow directory" + required: false + default: ".github/workflows" + attack-paths: + description: "Enable attack path analysis" + required: false + default: "true" + fail-on: + description: "Fail the step if any identity has this severity or higher (critical, high, medium, low)" + required: false + default: "high" + format: + description: "Output format (table, json, sarif)" + required: false + default: "table" + ascii: + description: "Force ASCII-safe output" + required: false + default: "true" + +runs: + using: "composite" + steps: + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + + - name: Install NHInsight + shell: bash + run: | + if [ -n "${{ inputs.version }}" ]; then + pip install "nhinsight==${{ inputs.version }}" + else + pip install nhinsight + fi + + - name: Run NHInsight scan + shell: bash + env: + CI: "true" + run: | + ARGS="scan" + + # Add provider flags + if [ -n "${{ inputs.providers }}" ]; then + ARGS="$ARGS ${{ inputs.providers }}" + fi + + # Workflow scanning + if [ -n "${{ inputs.github-workflows }}" ]; then + ARGS="$ARGS --github-workflows ${{ inputs.github-workflows }}" + fi + + # Attack paths + if [ "${{ inputs.attack-paths }}" = "true" ]; then + ARGS="$ARGS --attack-paths" + fi + + # CI summary (writes to $GITHUB_STEP_SUMMARY automatically) + ARGS="$ARGS --ci-summary" + + # ASCII safe + if [ "${{ inputs.ascii }}" = "true" ]; then + ARGS="$ARGS --ascii" + fi + + # Fail on severity threshold + if [ -n "${{ inputs.fail-on }}" ]; then + ARGS="$ARGS --fail-on ${{ inputs.fail-on }}" + fi + + nhinsight $ARGS diff --git a/nhinsight/analyzers/attack_paths.py b/nhinsight/analyzers/attack_paths.py index 7b94ad8..1a05331 100644 --- a/nhinsight/analyzers/attack_paths.py +++ b/nhinsight/analyzers/attack_paths.py @@ -250,12 +250,18 @@ def _build_attack_path( graph, node_ids, edges, cross_system, blast ) - # Build description + # Build description — reviewer-friendly wording entry_label = steps[0].node_label if steps else "?" target_label = steps[-1].node_label if steps else "?" - desc = f"{entry_label} → {target_label}" + target_node = graph.nodes.get(node_ids[-1]) + desc = f"{entry_label} can reach {target_label}" + if target_node and target_node.is_privileged: + meta = target_node.metadata + role = meta.get("role_name", "") + if role: + desc = f"{entry_label} can reach {role} via {target_label}" if cross_system: - desc += f" (cross-system: {' → '.join(providers)})" + desc += f" (crosses {' → '.join(providers)})" # Recommendation rec = _generate_recommendation(graph, node_ids, edges, cross_system) @@ -417,6 +423,16 @@ def _compute_path_severity( "Scope the GCP SA to least-privilege. " "Use IAM Conditions to restrict to specific K8s namespace/SA." ), + EdgeType.OIDC_ASSUMES_ROLE: ( + "Restrict the OIDC trust policy to specific repos/branches. " + "Use sub claim conditions (repo:org/repo:ref:refs/heads/main). " + "Replace admin policies with least-privilege scoped to deployment needs." + ), + EdgeType.ACCESSES_RESOURCE: ( + "Apply least-privilege access to each cloud resource. " + "Scope credentials to only the specific resources and actions needed. " + "Add environment protection rules and branch restrictions on the workflow." + ), } diff --git a/nhinsight/analyzers/graph.py b/nhinsight/analyzers/graph.py index 6b59bfb..109f519 100644 --- a/nhinsight/analyzers/graph.py +++ b/nhinsight/analyzers/graph.py @@ -30,6 +30,8 @@ class EdgeType(str, Enum): GCP_IAM_BINDING = "gcp_iam_binding" # GCP SA → IAM role GCP_WI_MAPS_TO = "gcp_wi_maps_to" # K8s SA → GCP SA (GKE WI) DEPLOYS_TO = "deploys_to" # GitHub App → target + OIDC_ASSUMES_ROLE = "oidc_assumes_role" # GH Actions OIDC → cloud role + ACCESSES_RESOURCE = "accesses_resource" # identity → cloud/infra resource @dataclass @@ -139,6 +141,7 @@ def to_dict(self) -> Dict[str, Any]: IdentityType.GCP_SA_KEY, IdentityType.K8S_SECRET, IdentityType.IAM_USER, + IdentityType.GITHUB_ACTIONS_OIDC, } @@ -519,6 +522,184 @@ def build_graph(identities: List[Identity]) -> IdentityGraph: label=f"GKE WI → {sa_name}", )) + # GitHub Actions OIDC → cloud roles (AWS, Azure, GCP) + for oidc in by_type.get(IdentityType.GITHUB_ACTIONS_OIDC, []): + # AWS OIDC: role_arn in raw + role_arn = oidc.raw.get("role_arn", "") + if role_arn: + target = by_arn.get(role_arn) + if target: + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=target.id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label=f"OIDC → {target.name}", + )) + else: + # Create synthetic AWS IAM role node + synth_id = f"aws:iam:role:oidc:{role_arn}" + role_name = role_arn.split("/")[-1] if "/" in role_arn else role_arn + # Check if role is admin-privileged + oidc_policies = oidc.raw.get("role_policies", []) + is_priv = any(p in ADMIN_POLICIES for p in oidc_policies) + if synth_id not in graph.nodes: + graph.add_node(GraphNode( + id=synth_id, + label=role_name, + node_type="iam_role", + provider="aws", + is_privileged=is_priv, + metadata={"arn": role_arn, "synthetic": True, + "role_name": role_name, + "policies": oidc_policies}, + )) + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=synth_id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label=f"OIDC → {role_name}", + )) + # If the role has known policies, create policy nodes + for pol in oidc_policies: + pol_id = f"aws:policy:oidc:{hash(pol) & 0xFFFFFFFF}" + is_admin_pol = pol in ADMIN_POLICIES + if pol_id not in graph.nodes: + graph.add_node(GraphNode( + id=pol_id, + label=pol, + node_type="iam_policy", + provider="aws", + is_privileged=is_admin_pol, + metadata={"policy": pol}, + )) + graph.add_edge(GraphEdge( + source_id=synth_id, + target_id=pol_id, + edge_type=EdgeType.HAS_POLICY, + label=f"has {pol}", + )) + + # Azure OIDC: azure_client_id in raw + az_client_id = oidc.raw.get("azure_client_id", "") + if az_client_id: + target = azure_by_appid.get(az_client_id) + if target: + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=target.id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label=f"OIDC → {target.name}", + )) + else: + synth_id = f"azure:sp:oidc:{az_client_id}" + if synth_id not in graph.nodes: + graph.add_node(GraphNode( + id=synth_id, + label=f"Azure SP ({az_client_id[:8]}...)", + node_type="azure_sp", + provider="azure", + metadata={"client_id": az_client_id, "synthetic": True}, + )) + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=synth_id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label="OIDC → Azure SP", + )) + + # GCP OIDC: gcp_service_account in raw + gcp_sa = oidc.raw.get("gcp_service_account", "") + if gcp_sa: + target = gcp_sa_by_email.get(gcp_sa) + if target: + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=target.id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label=f"OIDC → {target.name}", + )) + else: + synth_id = f"gcp:sa:oidc:{gcp_sa}" + sa_name = gcp_sa.split("@")[0] if "@" in gcp_sa else gcp_sa + if synth_id not in graph.nodes: + graph.add_node(GraphNode( + id=synth_id, + label=sa_name, + node_type="gcp_service_account", + provider="gcp", + metadata={"email": gcp_sa, "synthetic": True}, + )) + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=synth_id, + edge_type=EdgeType.OIDC_ASSUMES_ROLE, + label=f"OIDC → {sa_name}", + )) + + # GitHub Actions / OIDC identities → cloud resource access + # Creates synthetic resource nodes for every detected cloud/infra resource + _PRIVILEGED_RESOURCE_TYPES = { + "azure_keyvault", "azure_aks", "azure_sql", "azure_cosmosdb", + "azure_ad", "azure_iam", "azure_storage", "azure_dns", + "aws_secrets", "aws_iam", "aws_s3", "aws_eks", "aws_rds", + "gcp_secrets", "gcp_gke", "gcp_iam", "gcp_sql", + "k8s_secret", "terraform", "pulumi", + } + _RESOURCE_PROVIDER_MAP = { + "azure_": "azure", "aws_": "aws", "gcp_": "gcp", + "k8s": "kubernetes", "helm": "kubernetes", + "terraform": "iac", "pulumi": "iac", "ansible": "iac", + "container_": "docker", "cloudflare": "cloudflare", + } + + for oidc in by_type.get(IdentityType.GITHUB_ACTIONS_OIDC, []): + cloud_resources = oidc.raw.get("cloud_resources", []) + if not cloud_resources: + continue + + for res in cloud_resources: + rtype = res.get("resource_type", "") if isinstance(res, dict) else getattr(res, "resource_type", "") + action = res.get("action", "") if isinstance(res, dict) else getattr(res, "action", "") + rname = res.get("resource_name", "") if isinstance(res, dict) else getattr(res, "resource_name", "") + severity = res.get("severity", "high") if isinstance(res, dict) else getattr(res, "severity", "high") + + if not rtype: + continue + + # Determine provider for the resource node + res_provider = "cloud" + for prefix, prov in _RESOURCE_PROVIDER_MAP.items(): + if rtype.startswith(prefix): + res_provider = prov + break + + # Build a unique node ID for deduplication + res_id = f"resource:{rtype}:{rname}" if rname else f"resource:{rtype}" + label = f"{rname} ({action})" if rname else f"{rtype.replace('_', ' ').title()} ({action})" + is_priv = rtype in _PRIVILEGED_RESOURCE_TYPES or severity == "critical" + + if res_id not in graph.nodes: + graph.add_node(GraphNode( + id=res_id, + label=label, + node_type=rtype, + provider=res_provider, + is_privileged=is_priv, + metadata={ + "resource_type": rtype, + "action": action, + "resource_name": rname, + "severity": severity, + "synthetic": True, + }, + )) + graph.add_edge(GraphEdge( + source_id=oidc.id, + target_id=res_id, + edge_type=EdgeType.ACCESSES_RESOURCE, + label=f"{oidc.raw.get('auth_method', 'auth')} → {label}", + )) + logger.info( "Built identity graph: %d nodes, %d edges, " "%d entry points, %d privileged", diff --git a/nhinsight/analyzers/risk.py b/nhinsight/analyzers/risk.py index ff26299..c9a90b9 100644 --- a/nhinsight/analyzers/risk.py +++ b/nhinsight/analyzers/risk.py @@ -32,7 +32,10 @@ def analyze_risk(identities: List[Identity], config: NHInsightConfig) -> List[Identity]: """Run all risk checks against discovered identities and attach RiskFlags.""" for ident in identities: - ident.risk_flags = [] + # Preserve risk flags set by upstream scanners (e.g. workflow_scanner) + preserved = [f for f in ident.risk_flags + if f.code.startswith("GH_OIDC_") or f.code.startswith("GH_WF_")] + ident.risk_flags = preserved if ident.provider == Provider.AWS: _check_aws_risks(ident, config) diff --git a/nhinsight/analyzers/workflow_scanner.py b/nhinsight/analyzers/workflow_scanner.py new file mode 100644 index 0000000..3ba41a1 --- /dev/null +++ b/nhinsight/analyzers/workflow_scanner.py @@ -0,0 +1,847 @@ +# MIT License — Copyright (c) 2026 cvemula1 +# GitHub Actions Workflow Scanner — detect OIDC identity usage in CI/CD pipelines + +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional + +from nhinsight.core.models import ( + Classification, + Identity, + IdentityType, + Provider, + RiskFlag, + Severity, +) + +logger = logging.getLogger(__name__) + + +# ── Known action patterns ───────────────────────────────────────────── + +# AWS: aws-actions/configure-aws-credentials +_AWS_OIDC_RE = re.compile( + r"uses:\s*aws-actions/configure-aws-credentials", re.IGNORECASE +) +_ROLE_ARN_RE = re.compile( + r"role-to-assume:\s*(\S+)", re.IGNORECASE +) + +# Azure: azure/login +_AZURE_OIDC_RE = re.compile( + r"uses:\s*azure/login", re.IGNORECASE +) +_AZURE_CLIENT_ID_RE = re.compile( + r"client-id:\s*(\S+)", re.IGNORECASE +) +_AZURE_TENANT_ID_RE = re.compile( + r"tenant-id:\s*(\S+)", re.IGNORECASE +) + +# GCP: google-github-actions/auth +_GCP_OIDC_RE = re.compile( + r"uses:\s*google-github-actions/auth", re.IGNORECASE +) +_GCP_WIF_PROVIDER_RE = re.compile( + r"workload_identity_provider:\s*(\S+)", re.IGNORECASE +) +_GCP_SA_RE = re.compile( + r"service_account:\s*(\S+)", re.IGNORECASE +) + +# OIDC permission detection (skip commented lines) +_OIDC_PERM_RE = re.compile( + r"^[^#\n]*id-token:\s*write", re.IGNORECASE | re.MULTILINE +) +# permissions: write-all grants id-token: write implicitly +_WRITE_ALL_RE = re.compile( + r"^[^#\n]*permissions:\s*write-all", re.IGNORECASE | re.MULTILINE +) + +# Azure Managed Identity login (self-hosted runners) +_AZ_MI_LOGIN_RE = re.compile( + r"az\s+login\s+--identity", re.IGNORECASE +) + +# Key Vault secret access — match both arg orders, handle ${{ }} expressions +_KV_SECRET_RE = re.compile( + r"az\s+keyvault\s+secret\s+show" + r"(?=.*--vault-name\s+(?P\$\{\{[^}]+\}\}|\S+))" + r"(?=.*--name\s+(?P\$\{\{[^}]+\}\}|\S+))", + re.IGNORECASE, +) +# Key Vault name from env var assignment (e.g. BACKEND_VAULT_NAME: "seaionl-secrets") +_KV_NAME_ENV_RE = re.compile( + r"(?:VAULT_NAME|KEY_VAULT).*?:\s*[\"']?([a-zA-Z0-9][\w-]+)[\"']?\s*$", + re.IGNORECASE | re.MULTILINE, +) + +# AKS get-credentials +_AKS_CREDS_RE = re.compile( + r"az\s+aks\s+get-credentials", re.IGNORECASE +) + +# Self-hosted runner detection — array format [label1, label2] +_SELF_HOSTED_ARRAY_RE = re.compile( + r"runs-on:\s*\[([^\]]+)\]", re.IGNORECASE +) +# Self-hosted runner detection — string format (no brackets, no ${{ }}, no group:/labels: keys) +_SELF_HOSTED_STR_RE = re.compile( + r"runs-on:\s*(?!\[)(?!\$)(?!group:)(?!labels:)(\S+)", re.IGNORECASE +) + +# Secrets reference pattern +_SECRETS_RE = re.compile(r"\$\{\{\s*secrets\.(\w+)\s*\}\}") + + +# ── Resource access detection ────────────────────────────────────────── + +@dataclass +class ResourceAccess: + """A cloud/infra resource accessed from a workflow.""" + resource_type: str # azure_keyvault, azure_acr, azure_aks, k8s, helm, etc. + action: str # e.g. "secret show", "login", "get-credentials" + resource_name: str = "" # e.g. vault name, ACR name, cluster name + severity: str = "high" # critical, high, medium, low + details: str = "" + + +# Extensible table: (regex, resource_type, action, severity, name_group_index) +# name_group_index: which regex group contains the resource name (0 = none) +_RESOURCE_PATTERNS: List[tuple] = [ + # ── Azure ── + (re.compile(r"az\s+keyvault\s+secret", re.I), + "azure_keyvault", "secret access", "high", 0), + (re.compile(r"az\s+acr\s+login\s+--name\s+(\S+)", re.I), + "azure_acr", "registry login", "high", 1), + (re.compile(r"az\s+acr\s+repository", re.I), + "azure_acr", "repository access", "medium", 0), + (re.compile(r"az\s+aks\s+get-credentials", re.I), + "azure_aks", "cluster credentials", "high", 0), + (re.compile(r"az\s+aks\s+show", re.I), + "azure_aks", "cluster info", "low", 0), + (re.compile(r"az\s+storage\s+(?:blob|container|account)", re.I), + "azure_storage", "storage access", "high", 0), + (re.compile(r"az\s+sql", re.I), + "azure_sql", "database access", "high", 0), + (re.compile(r"az\s+cosmosdb", re.I), + "azure_cosmosdb", "cosmosdb access", "high", 0), + (re.compile(r"az\s+servicebus", re.I), + "azure_servicebus", "service bus access", "medium", 0), + (re.compile(r"az\s+eventhubs?", re.I), + "azure_eventhub", "event hub access", "medium", 0), + (re.compile(r"az\s+appconfig", re.I), + "azure_appconfig", "app configuration", "medium", 0), + (re.compile(r"az\s+network", re.I), + "azure_network", "network access", "medium", 0), + (re.compile(r"az\s+dns", re.I), + "azure_dns", "DNS management", "high", 0), + (re.compile(r"az\s+webapp", re.I), + "azure_webapp", "web app access", "high", 0), + (re.compile(r"az\s+functionapp", re.I), + "azure_functions", "function app access", "high", 0), + (re.compile(r"az\s+ad\s+(?:app|sp)", re.I), + "azure_ad", "AD app/SP management", "critical", 0), + (re.compile(r"az\s+role\s+assignment", re.I), + "azure_iam", "role assignment", "critical", 0), + # ── AWS ── + (re.compile(r"aws\s+s3", re.I), + "aws_s3", "S3 access", "high", 0), + (re.compile(r"aws\s+secretsmanager", re.I), + "aws_secrets", "Secrets Manager access", "high", 0), + (re.compile(r"aws\s+sts", re.I), + "aws_sts", "STS assume-role", "high", 0), + (re.compile(r"aws\s+ec2", re.I), + "aws_ec2", "EC2 access", "high", 0), + (re.compile(r"aws\s+iam", re.I), + "aws_iam", "IAM management", "critical", 0), + (re.compile(r"aws\s+lambda", re.I), + "aws_lambda", "Lambda access", "high", 0), + (re.compile(r"aws\s+ecr", re.I), + "aws_ecr", "ECR access", "high", 0), + (re.compile(r"aws\s+eks", re.I), + "aws_eks", "EKS access", "high", 0), + (re.compile(r"aws\s+rds", re.I), + "aws_rds", "RDS access", "high", 0), + (re.compile(r"aws\s+dynamodb", re.I), + "aws_dynamodb", "DynamoDB access", "high", 0), + (re.compile(r"aws\s+cloudformation", re.I), + "aws_cloudformation", "CloudFormation access", "critical", 0), + # ── GCP ── + (re.compile(r"gcloud\s+compute", re.I), + "gcp_compute", "Compute Engine access", "high", 0), + (re.compile(r"gcloud\s+container\s+clusters", re.I), + "gcp_gke", "GKE cluster access", "high", 0), + (re.compile(r"gcloud\s+secrets", re.I), + "gcp_secrets", "Secret Manager access", "high", 0), + (re.compile(r"gcloud\s+sql", re.I), + "gcp_sql", "Cloud SQL access", "high", 0), + (re.compile(r"gcloud\s+iam", re.I), + "gcp_iam", "IAM management", "critical", 0), + (re.compile(r"gsutil", re.I), + "gcp_storage", "Cloud Storage access", "high", 0), + # ── Kubernetes ── + (re.compile(r"kubectl\s+apply", re.I), + "k8s", "resource apply", "high", 0), + (re.compile(r"kubectl\s+create\s+secret", re.I), + "k8s_secret", "secret creation", "high", 0), + (re.compile(r"kubectl\s+create\s+configmap", re.I), + "k8s_configmap", "configmap creation", "medium", 0), + (re.compile(r"kubectl\s+(?:delete|patch|replace)", re.I), + "k8s", "resource mutation", "high", 0), + (re.compile(r"kubectl\s+exec", re.I), + "k8s", "pod exec", "critical", 0), + # ── Helm ── + (re.compile(r"helm\s+(?:upgrade|install)", re.I), + "helm", "deployment", "high", 0), + # ── Docker / Container Registry ── + (re.compile(r"docker\s+push", re.I), + "container_registry", "image push", "high", 0), + (re.compile(r"docker\s+(?:build|buildx)", re.I), + "container_build", "image build", "medium", 0), + # ── Infrastructure as Code ── + (re.compile(r"terraform\s+apply", re.I), + "terraform", "infra apply", "critical", 0), + (re.compile(r"terraform\s+plan", re.I), + "terraform", "infra plan", "high", 0), + (re.compile(r"terraform\s+destroy", re.I), + "terraform", "infra destroy", "critical", 0), + (re.compile(r"pulumi\s+up", re.I), + "pulumi", "infra apply", "critical", 0), + (re.compile(r"ansible-playbook", re.I), + "ansible", "config management", "high", 0), + # ── External APIs ── + (re.compile(r"cloudflare", re.I), + "cloudflare", "DNS/CDN management", "high", 0), +] + + +def _detect_resource_access(content: str) -> List[ResourceAccess]: + """Detect all cloud/infra resource access patterns in workflow content.""" + seen: set = set() + resources: List[ResourceAccess] = [] + for pattern, rtype, action, severity, name_idx in _RESOURCE_PATTERNS: + for m in pattern.finditer(content): + key = (rtype, action) + if key in seen: + continue + seen.add(key) + name = "" + if name_idx and name_idx <= len(m.groups()): + name = m.group(name_idx) + resources.append(ResourceAccess( + resource_type=rtype, + action=action, + resource_name=_resolve_value(name) if name else "", + severity=severity, + )) + break # one match per pattern is enough + return resources + + +@dataclass +class WorkflowOIDCConnection: + """A single OIDC or cloud auth connection found in a workflow file.""" + workflow_file: str + workflow_name: str + job_name: str = "" + cloud_provider: str = "" # aws, azure, gcp + auth_method: str = "" # oidc, managed_identity, static_secret + role_arn: str = "" # AWS role ARN + azure_client_id: str = "" # Azure SP client ID + azure_tenant_id: str = "" # Azure tenant ID + gcp_wif_provider: str = "" # GCP Workload Identity pool + gcp_service_account: str = "" # GCP SA email + has_oidc_permission: bool = False + self_hosted_runner: str = "" # Runner label if self-hosted + keyvault_secrets: List[str] = field(default_factory=list) + keyvault_name: str = "" + has_aks_access: bool = False + trigger_events: List[str] = field(default_factory=list) + secrets_used: List[str] = field(default_factory=list) + cloud_resources: List[ResourceAccess] = field(default_factory=list) + raw_step: str = "" + + +@dataclass +class WorkflowScanResult: + """Results of scanning workflow files.""" + workflows_scanned: int = 0 + oidc_connections: List[WorkflowOIDCConnection] = field(default_factory=list) + identities: List[Identity] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + + +# ── Scanner ──────────────────────────────────────────────────────────── + +def scan_workflows( + path: str = ".github/workflows", + *, + repo_name: str = "", +) -> WorkflowScanResult: + """Scan GitHub Actions workflow files for OIDC identity connections. + + Parameters + ---------- + path : str + Path to the workflows directory or a single .yml/.yaml file. + repo_name : str + Repository name (org/repo) for labeling. Auto-detected from git if empty. + + Returns + ------- + WorkflowScanResult + Discovered OIDC connections and generated Identity objects. + """ + result = WorkflowScanResult() + wf_path = Path(path) + + if not repo_name: + repo_name = _detect_repo_name(wf_path) + + # Collect workflow files + if wf_path.is_file(): + files = [wf_path] + elif wf_path.is_dir(): + files = sorted(wf_path.glob("*.yml")) + sorted(wf_path.glob("*.yaml")) + else: + result.errors.append(f"Path not found: {path}") + return result + + if not files: + result.errors.append(f"No workflow files found in {path}") + return result + + # Resolve .github root for local composite action resolution + github_root = None + if wf_path.is_dir(): + # .github/workflows -> .github + candidate = wf_path.parent + if candidate.name == ".github": + github_root = candidate + elif wf_path.is_file(): + candidate = wf_path.parent.parent + if candidate.name == ".github": + github_root = candidate + + for wf_file in files: + try: + content = wf_file.read_text() + # Inline local composite action content for pattern matching + content = _inline_local_actions(content, github_root) + connections = _parse_workflow(content, str(wf_file), repo_name) + result.oidc_connections.extend(connections) + result.workflows_scanned += 1 + except Exception as e: + result.errors.append(f"{wf_file.name}: {e}") + + # Convert connections to Identity objects + for conn in result.oidc_connections: + identities = _connection_to_identities(conn, repo_name) + result.identities.extend(identities) + + logger.info( + "Scanned %d workflows, found %d OIDC connections", + result.workflows_scanned, len(result.oidc_connections), + ) + + return result + + +# ── Local composite action inlining ─────────────────────────────────── + +_LOCAL_ACTION_RE = re.compile( + r"uses:\s*\./\.github/actions/([\w._-]+)", re.IGNORECASE +) + + +def _inline_local_actions(content: str, github_root: Optional[Path]) -> str: + """Append content from referenced local composite actions. + + When a workflow references ``uses: ./.github/actions/``, read + the corresponding ``action.yml`` / ``action.yaml`` and append its + content so that regex-based pattern matching picks up commands + defined inside composite actions (e.g. ``az login --identity``). + """ + if not github_root: + return content + seen: set = set() + for m in _LOCAL_ACTION_RE.finditer(content): + action_name = m.group(1) + if action_name in seen: + continue + seen.add(action_name) + for ext in ("action.yml", "action.yaml"): + action_file = github_root / "actions" / action_name / ext + if action_file.is_file(): + try: + content += "\n" + action_file.read_text() + except Exception: + pass + break + return content + + +def _parse_workflow(content: str, filepath: str, repo_name: str) -> List[WorkflowOIDCConnection]: + """Parse a single workflow file for OIDC connections. + + Uses line-by-line regex parsing (no YAML dependency required). + """ + connections: List[WorkflowOIDCConnection] = [] + filename = os.path.basename(filepath) + + # Extract workflow name + wf_name = filename + name_match = re.search(r"^name:\s*(.+)$", content, re.MULTILINE) + if name_match: + wf_name = name_match.group(1).strip().strip("'\"") + + # Check for OIDC permission (explicit id-token: write or permissions: write-all) + has_oidc = bool(_OIDC_PERM_RE.search(content)) or bool(_WRITE_ALL_RE.search(content)) + + # Extract trigger events + triggers = _extract_triggers(content) + + # Extract secrets used + secrets = _SECRETS_RE.findall(content) + + # Detect self-hosted runners (both array and string formats) + runner_labels = [] + for m in _SELF_HOSTED_ARRAY_RE.finditer(content): + for raw in m.group(1).split(","): + cleaned = raw.strip().strip("'\"") + if not cleaned.startswith("${{"): + runner_labels.append(cleaned) + for m in _SELF_HOSTED_STR_RE.finditer(content): + runner_labels.append(m.group(1).strip().strip("'\"")) + # Filter out standard GitHub-hosted runners + gh_hosted = {"ubuntu-latest", "ubuntu-22.04", "ubuntu-20.04", "ubuntu-24.04", + "windows-latest", "windows-2022", "windows-2019", + "macos-latest", "macos-14", "macos-13", "macos-12"} + self_hosted_labels = [label for label in runner_labels if label not in gh_hosted] + # Deduplicate while preserving order + seen = set() + unique_labels = [] + for label in self_hosted_labels: + if label not in seen: + seen.add(label) + unique_labels.append(label) + self_hosted_runner = ", ".join(unique_labels) if unique_labels else "" + + # Detect Key Vault secrets accessed + kv_secrets = [] + kv_name = "" + for m in _KV_SECRET_RE.finditer(content): + vault = _resolve_value(m.group("vault")) + secret_name = _resolve_value(m.group("secret")) + kv_secrets.append(secret_name) + # Pick up the vault name if it's a literal (not a ${{ }} ref) + if not kv_name and not vault.startswith("$"): + kv_name = vault + # Fallback: extract vault name from env var assignments + if not kv_name: + env_match = _KV_NAME_ENV_RE.search(content) + if env_match: + kv_name = env_match.group(1) + + # Detect AKS credential access + has_aks = bool(_AKS_CREDS_RE.search(content)) + + # Detect all cloud/infra resource access patterns + cloud_resources = _detect_resource_access(content) + + # ── AWS OIDC ── + for match in _AWS_OIDC_RE.finditer(content): + start = max(0, match.start() - 50) + end = min(len(content), match.end() + 500) + context = content[start:end] + role_match = _ROLE_ARN_RE.search(context) + role_arn = role_match.group(1) if role_match else "" + role_arn = _resolve_value(role_arn) + + job = _find_job_name(content, match.start()) + connections.append(WorkflowOIDCConnection( + workflow_file=filename, + workflow_name=wf_name, + job_name=job, + cloud_provider="aws", + auth_method="oidc", + role_arn=role_arn, + has_oidc_permission=has_oidc, + self_hosted_runner=self_hosted_runner, + keyvault_secrets=kv_secrets, + keyvault_name=kv_name, + has_aks_access=has_aks, + trigger_events=triggers, + secrets_used=secrets, + cloud_resources=cloud_resources, + raw_step=context.strip()[:200], + )) + + # ── Azure OIDC (azure/login action) ── + for match in _AZURE_OIDC_RE.finditer(content): + start = max(0, match.start() - 50) + end = min(len(content), match.end() + 500) + context = content[start:end] + client_match = _AZURE_CLIENT_ID_RE.search(context) + tenant_match = _AZURE_TENANT_ID_RE.search(context) + client_id = _resolve_value(client_match.group(1)) if client_match else "" + tenant_id = _resolve_value(tenant_match.group(1)) if tenant_match else "" + + job = _find_job_name(content, match.start()) + connections.append(WorkflowOIDCConnection( + workflow_file=filename, + workflow_name=wf_name, + job_name=job, + cloud_provider="azure", + auth_method="oidc", + azure_client_id=client_id, + azure_tenant_id=tenant_id, + has_oidc_permission=has_oidc, + self_hosted_runner=self_hosted_runner, + keyvault_secrets=kv_secrets, + keyvault_name=kv_name, + has_aks_access=has_aks, + trigger_events=triggers, + secrets_used=secrets, + cloud_resources=cloud_resources, + raw_step=context.strip()[:200], + )) + + # ── Azure Managed Identity (az login --identity) ── + for match in _AZ_MI_LOGIN_RE.finditer(content): + job = _find_job_name(content, match.start()) + connections.append(WorkflowOIDCConnection( + workflow_file=filename, + workflow_name=wf_name, + job_name=job, + cloud_provider="azure", + auth_method="managed_identity", + has_oidc_permission=has_oidc, + self_hosted_runner=self_hosted_runner, + keyvault_secrets=kv_secrets, + keyvault_name=kv_name, + has_aks_access=has_aks, + trigger_events=triggers, + secrets_used=secrets, + cloud_resources=cloud_resources, + raw_step=content[max(0, match.start() - 30):match.end() + 100].strip()[:200], + )) + + # ── GCP OIDC ── + for match in _GCP_OIDC_RE.finditer(content): + start = max(0, match.start() - 50) + end = min(len(content), match.end() + 500) + context = content[start:end] + wif_match = _GCP_WIF_PROVIDER_RE.search(context) + sa_match = _GCP_SA_RE.search(context) + wif_provider = _resolve_value(wif_match.group(1)) if wif_match else "" + sa_email = _resolve_value(sa_match.group(1)) if sa_match else "" + + job = _find_job_name(content, match.start()) + connections.append(WorkflowOIDCConnection( + workflow_file=filename, + workflow_name=wf_name, + job_name=job, + cloud_provider="gcp", + auth_method="oidc", + gcp_wif_provider=wif_provider, + gcp_service_account=sa_email, + has_oidc_permission=has_oidc, + self_hosted_runner=self_hosted_runner, + keyvault_secrets=kv_secrets, + keyvault_name=kv_name, + has_aks_access=has_aks, + trigger_events=triggers, + secrets_used=secrets, + cloud_resources=cloud_resources, + raw_step=context.strip()[:200], + )) + + return connections + + +def _connection_to_identities(conn: WorkflowOIDCConnection, repo_name: str) -> List[Identity]: + """Convert a cloud auth connection to NHInsight Identity objects with risk flags.""" + identities: List[Identity] = [] + job_label = f"/{conn.job_name}" if conn.job_name else "" + + risk_flags: List[RiskFlag] = [] + + # ── Common risk checks ── + + # Risk: OIDC without proper permission declaration (only for OIDC auth) + if conn.auth_method == "oidc" and not conn.has_oidc_permission: + risk_flags.append(RiskFlag( + Severity.MEDIUM, "GH_OIDC_NO_PERMISSION", + "Workflow uses cloud auth action but does not declare id-token: write", + "Medium: without explicit id-token permission, the OIDC token may not be " + "available or the workflow may be using long-lived secrets instead.", + )) + + # Risk: PR trigger with cloud auth + if any(t in ("pull_request", "pull_request_target") for t in conn.trigger_events): + risk_flags.append(RiskFlag( + Severity.HIGH, "GH_OIDC_PR_TRIGGER", + "Cloud auth triggered on pull_request events", + "High: any contributor or external PR author can trigger this workflow " + "and obtain cloud credentials. Restrict to push/release events " + "or add environment protection rules.", + )) + + # Risk: Key Vault secret access (credential sprawl from vault to env vars) + if conn.keyvault_secrets: + kv_list = ", ".join(conn.keyvault_secrets[:5]) + suffix = f" (+{len(conn.keyvault_secrets) - 5} more)" if len(conn.keyvault_secrets) > 5 else "" + risk_flags.append(RiskFlag( + Severity.MEDIUM, "GH_WF_KEYVAULT_SECRETS", + f"Workflow reads {len(conn.keyvault_secrets)} secrets from Key Vault " + f"'{conn.keyvault_name}': {kv_list}{suffix}", + "Medium: secrets fetched from Key Vault are exposed as environment " + "variables in the workflow. Ensure the runner's managed identity has " + "least-privilege Key Vault access and rotate secrets regularly.", + )) + + # Risk: AKS cluster access + if conn.has_aks_access: + risk_flags.append(RiskFlag( + Severity.MEDIUM, "GH_WF_AKS_ACCESS", + "Workflow fetches AKS cluster credentials", + "Medium: workflow obtains kubeconfig for AKS cluster. Compromise of " + "the runner could lead to Kubernetes cluster access. Scope the " + "managed identity to minimal AKS RBAC roles.", + )) + + # Risk: Self-hosted runner with managed identity + if conn.self_hosted_runner and conn.auth_method == "managed_identity": + risk_flags.append(RiskFlag( + Severity.HIGH, "GH_WF_SELF_HOSTED_MI", + f"Self-hosted runner '{conn.self_hosted_runner}' uses Managed Identity " + "for Azure access", + "High: the runner VM's managed identity grants implicit Azure access " + "to every workflow that runs on it. A compromised workflow or malicious " + "PR could access Azure resources. Use environment protection rules and " + "restrict runner labels to trusted workflows.", + )) + + # ── Provider-specific identity creation ── + + if conn.cloud_provider == "aws" and conn.auth_method == "oidc": + ident_id = f"github:oidc:aws:{conn.workflow_file}:{conn.role_arn or 'unknown'}" + name = f"OIDC → AWS ({conn.workflow_name}{job_label})" + + role_name = conn.role_arn.split("/")[-1] if "/" in conn.role_arn else conn.role_arn + admin_keywords = {"admin", "administrator", "poweruser", "fullaccess", "deploy-all"} + if any(kw in role_name.lower() for kw in admin_keywords): + risk_flags.append(RiskFlag( + Severity.HIGH, "GH_OIDC_ADMIN_ROLE", + f"OIDC workflow assumes role with admin-like name: {role_name}", + f"High: workflow {conn.workflow_name} assumes {role_name} which " + "suggests elevated privileges. Verify the role's actual policies " + "and restrict to least-privilege.", + )) + + if not conn.role_arn or conn.role_arn.startswith("$"): + risk_flags.append(RiskFlag( + Severity.INFO, "GH_OIDC_DYNAMIC_ROLE", + "OIDC role ARN uses a secrets/variable reference", + "Info: role ARN is resolved at runtime from secrets. " + "Combine with --aws scan to correlate the actual role.", + )) + + identities.append(Identity( + id=ident_id, + name=name, + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": conn.workflow_file, + "workflow_name": conn.workflow_name, + "job_name": conn.job_name, + "cloud_provider": "aws", + "auth_method": "oidc", + "role_arn": conn.role_arn, + "trigger_events": conn.trigger_events, + "has_oidc_permission": conn.has_oidc_permission, + "cloud_resources": [ + {"resource_type": r.resource_type, "action": r.action, + "resource_name": r.resource_name, "severity": r.severity} + for r in conn.cloud_resources + ], + }, + risk_flags=risk_flags, + )) + + elif conn.cloud_provider == "azure" and conn.auth_method == "oidc": + ident_id = f"github:oidc:azure:{conn.workflow_file}:{conn.azure_client_id or 'unknown'}" + name = f"OIDC → Azure ({conn.workflow_name}{job_label})" + + identities.append(Identity( + id=ident_id, + name=name, + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": conn.workflow_file, + "workflow_name": conn.workflow_name, + "job_name": conn.job_name, + "cloud_provider": "azure", + "auth_method": "oidc", + "azure_client_id": conn.azure_client_id, + "azure_tenant_id": conn.azure_tenant_id, + "trigger_events": conn.trigger_events, + "has_oidc_permission": conn.has_oidc_permission, + "cloud_resources": [ + {"resource_type": r.resource_type, "action": r.action, + "resource_name": r.resource_name, "severity": r.severity} + for r in conn.cloud_resources + ], + }, + risk_flags=risk_flags, + )) + + elif conn.cloud_provider == "azure" and conn.auth_method == "managed_identity": + runner_tag = conn.self_hosted_runner or "self-hosted" + ident_id = f"github:mi:azure:{conn.workflow_file}:{runner_tag}" + name = f"MI → Azure ({conn.workflow_name}{job_label})" + + identities.append(Identity( + id=ident_id, + name=name, + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": conn.workflow_file, + "workflow_name": conn.workflow_name, + "job_name": conn.job_name, + "cloud_provider": "azure", + "auth_method": "managed_identity", + "self_hosted_runner": conn.self_hosted_runner, + "keyvault_name": conn.keyvault_name, + "keyvault_secrets": conn.keyvault_secrets, + "has_aks_access": conn.has_aks_access, + "trigger_events": conn.trigger_events, + "cloud_resources": [ + {"resource_type": r.resource_type, "action": r.action, + "resource_name": r.resource_name, "severity": r.severity} + for r in conn.cloud_resources + ], + }, + risk_flags=risk_flags, + )) + + elif conn.cloud_provider == "gcp" and conn.auth_method == "oidc": + ident_id = f"github:oidc:gcp:{conn.workflow_file}:{conn.gcp_service_account or 'unknown'}" + name = f"OIDC → GCP ({conn.workflow_name}{job_label})" + + identities.append(Identity( + id=ident_id, + name=name, + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": conn.workflow_file, + "workflow_name": conn.workflow_name, + "job_name": conn.job_name, + "cloud_provider": "gcp", + "auth_method": "oidc", + "gcp_service_account": conn.gcp_service_account, + "gcp_wif_provider": conn.gcp_wif_provider, + "trigger_events": conn.trigger_events, + "has_oidc_permission": conn.has_oidc_permission, + "cloud_resources": [ + {"resource_type": r.resource_type, "action": r.action, + "resource_name": r.resource_name, "severity": r.severity} + for r in conn.cloud_resources + ], + }, + risk_flags=risk_flags, + )) + + return identities + + +# ── Helpers ──────────────────────────────────────────────────────────── + +def _resolve_value(val: str) -> str: + """Resolve a workflow value — leave secrets refs as-is, strip quotes.""" + if not val: + return "" + val = val.strip().strip("'\"") + return val + + +_KNOWN_EVENTS = { + "push", "pull_request", "pull_request_target", "workflow_dispatch", + "workflow_call", "schedule", "release", "create", "delete", + "deployment", "issue_comment", "issues", "label", "merge_group", + "page_build", "repository_dispatch", "workflow_run", +} + + +def _extract_triggers(content: str) -> List[str]: + """Extract trigger event names from a workflow file.""" + triggers = [] + # Match "on:" section + on_match = re.search(r"^on:\s*$", content, re.MULTILINE) + if on_match: + # Multiline on: block — only pick top-level keys that are known events + pos = on_match.end() + for line in content[pos:].split("\n"): + stripped = line.strip() + if stripped and not stripped.startswith("#"): + # Must be exactly 2-space indented (top-level under on:) + if re.match(r"^ \S", line) and stripped.endswith(":"): + candidate = stripped.rstrip(":").strip() + if candidate in _KNOWN_EVENTS: + triggers.append(candidate) + elif not line.startswith(" "): + break # End of on: block + else: + # Inline on: [push, pull_request] or on: push + inline_match = re.search(r"^on:\s*(.+)$", content, re.MULTILINE) + if inline_match: + val = inline_match.group(1).strip() + if val.startswith("["): + triggers = [t.strip().strip("'\"") for t in val.strip("[]").split(",")] + else: + triggers = [val.strip().strip("'\"")] + return triggers + + +def _find_job_name(content: str, pos: int) -> str: + """Find the job name that contains the given position.""" + # Look backwards from pos for the nearest "jobs:\n job_name:" pattern + before = content[:pos] + # Find all job headers before this position + job_matches = list(re.finditer(r"^\s{2}(\w[\w-]*):\s*$", before, re.MULTILINE)) + if job_matches: + return job_matches[-1].group(1) + return "" + + +def _detect_repo_name(wf_path: Path) -> str: + """Try to detect repository name from git remote.""" + try: + # Walk up to find .git directory + search = wf_path if wf_path.is_dir() else wf_path.parent + for _ in range(10): + git_dir = search / ".git" + if git_dir.exists(): + config = (git_dir / "config").read_text() + url_match = re.search(r"url\s*=\s*.*[:/]([^/]+/[^/\s]+?)(?:\.git)?\s*$", config, re.MULTILINE) + if url_match: + return url_match.group(1) + search = search.parent + except Exception: + pass + return "" diff --git a/nhinsight/cli.py b/nhinsight/cli.py index c32ecd1..963ea49 100644 --- a/nhinsight/cli.py +++ b/nhinsight/cli.py @@ -111,6 +111,12 @@ def _build_parser() -> argparse.ArgumentParser: help="Trace privilege escalation chains across providers (e.g. K8s SA → cloud admin)") analysis_group.add_argument("--mermaid", action="store_true", help="Output attack paths as Mermaid diagrams (implies --attack-paths)") + analysis_group.add_argument("--ci-summary", action="store_true", + help="Output a compact markdown summary for CI/PR usage (implies --attack-paths)") + analysis_group.add_argument("--github-workflows", metavar="PATH", nargs="?", + const=".github/workflows", + help="Scan GitHub Actions workflow files for OIDC cloud connections " + "(default path: .github/workflows)") analysis_group.add_argument("--stale-days", type=int, default=90, metavar="N", help="Days without use before flagging as stale (default: 90)") analysis_group.add_argument("--explain", action="store_true", @@ -122,7 +128,12 @@ def _build_parser() -> argparse.ArgumentParser: help="Output format (default: table)") out_group.add_argument("--output", "-o", metavar="FILE", help="Write output to file instead of stdout") + out_group.add_argument("--ascii", action="store_true", + help="ASCII-safe output (no emoji); auto-enabled in CI environments") out_group.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") + out_group.add_argument("--fail-on", choices=["critical", "high", "medium", "low"], + help="Exit with code 1 if any identity has this severity or higher " + "(useful for CI gating, e.g. --fail-on high)") # ── demo command ─────────────────────────────────────────────── demo_p = sub.add_parser( @@ -139,6 +150,12 @@ def _build_parser() -> argparse.ArgumentParser: help="Include attack path analysis in demo output") demo_p.add_argument("--mermaid", action="store_true", help="Output attack paths as Mermaid diagrams (implies --attack-paths)") + demo_p.add_argument("--ci-summary", action="store_true", + help="Output a compact markdown summary for CI/PR usage (implies --attack-paths)") + demo_p.add_argument("--github-workflows", action="store_true", + help="Include GitHub Actions OIDC demo data in output") + demo_p.add_argument("--ascii", action="store_true", + help="ASCII-safe output (no emoji); auto-enabled in CI environments") # ── report command ───────────────────────────────────────────── report_p = sub.add_parser( @@ -219,11 +236,13 @@ def _run_scan(args: argparse.Namespace) -> None: if args.k8s: providers.append("k8s") - if not providers: + has_workflows = bool(getattr(args, "github_workflows", None)) + if not providers and not has_workflows: print("\n No providers selected.\n") print(" \033[1mQuick examples:\033[0m") print(" nhinsight scan --aws Scan AWS IAM") print(" nhinsight scan --all --attack-paths Scan everything") + print(" nhinsight scan --github-workflows Scan GH Actions only") print(" nhinsight demo Try with sample data first") print() print(" Providers: --aws --azure --gcp --github --k8s --all\n") @@ -298,6 +317,18 @@ def _run_scan(args: argparse.Namespace) -> None: except Exception as e: result.errors.append(f"{provider_name}: {e}") + # GitHub Actions workflow scanning (if requested) + wf_path = getattr(args, "github_workflows", None) + if wf_path: + from nhinsight.analyzers.workflow_scanner import scan_workflows + wf_result = scan_workflows(wf_path) + if wf_result.identities: + all_identities.extend(wf_result.identities) + if wf_result.errors: + result.errors.extend(wf_result.errors) + if wf_result.oidc_connections: + result.providers_scanned.append("github-actions") + # Analyze classify_identities(all_identities) analyze_risk(all_identities, config) @@ -313,32 +344,75 @@ def _run_scan(args: argparse.Namespace) -> None: result.identities = all_identities + # Determine ASCII-safe mode (explicit flag or auto-detect CI) + from nhinsight.core.ci_summary import is_ci + ascii_safe = getattr(args, "ascii", False) or is_ci() + # Output out = sys.stdout if args.output: out = open(args.output, "w") - print_result(result, fmt=args.format, out=out) + # --ci-summary replaces the normal output with a compact markdown summary + wants_ci = getattr(args, "ci_summary", False) + + if not wants_ci: + print_result(result, fmt=args.format, out=out, ascii_safe=ascii_safe) # Attack path analysis (if requested) - # --mermaid implies --attack-paths - wants_attack = getattr(args, "attack_paths", False) or getattr(args, "mermaid", False) + # --mermaid and --ci-summary both imply --attack-paths + wants_attack = ( + getattr(args, "attack_paths", False) + or getattr(args, "mermaid", False) + or wants_ci + ) + ap_result = None if wants_attack and all_identities: from nhinsight.analyzers.attack_paths import analyze_attack_paths from nhinsight.core.output import print_attack_paths ap_result = analyze_attack_paths(all_identities) - print_attack_paths(ap_result, out=out) + + if not wants_ci: + print_attack_paths(ap_result, out=out, ascii_safe=ascii_safe) if getattr(args, "mermaid", False): from nhinsight.core.mermaid import render_attack_paths, render_summary_table render_summary_table(ap_result, out=out) render_attack_paths(ap_result, out=out) + # CI summary output (compact markdown for $GITHUB_STEP_SUMMARY / PR comments) + if wants_ci: + from nhinsight.core.ci_summary import print_ci_summary, write_github_step_summary + print_ci_summary(result, ap_result, out=out, ascii_safe=ascii_safe) + # Also write to $GITHUB_STEP_SUMMARY if available + write_github_step_summary(result, ap_result, ascii_safe=True) + if args.output: out.close() print(f"Results written to {args.output}") + # --fail-on: exit 1 if any identity meets or exceeds the threshold severity + fail_on = getattr(args, "fail_on", None) + if fail_on: + from nhinsight.core.models import Severity + threshold_map = { + "critical": Severity.CRITICAL, + "high": Severity.HIGH, + "medium": Severity.MEDIUM, + "low": Severity.LOW, + } + threshold = threshold_map[fail_on] + severity_order = [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW] + failing_sevs = set(severity_order[:severity_order.index(threshold) + 1]) + count = sum( + 1 for i in result.identities if i.highest_severity in failing_sevs + ) + if count: + print(f"\n[FAIL] {count} identit{'y' if count == 1 else 'ies'} " + f"at {fail_on.upper()} severity or above (--fail-on {fail_on})") + sys.exit(1) + def _build_demo_data() -> ScanResult: """Build realistic demo data for all three providers.""" @@ -1022,12 +1096,90 @@ def _build_demo_data() -> ScanResult: ), ] + # ── GitHub Actions OIDC demo identities ────────────────────────── + oidc_identities = [ + Identity( + id="github:oidc:aws:deploy.yml:arn:aws:iam::123456789012:role/github-deploy-admin", + name="OIDC → AWS (Deploy to Prod)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": "deploy.yml", + "workflow_name": "Deploy to Prod", + "job_name": "deploy", + "cloud_provider": "aws", + "role_arn": "arn:aws:iam::123456789012:role/github-deploy-admin", + "role_policies": ["AdministratorAccess"], + "trigger_events": ["push", "pull_request"], + "has_oidc_permission": True, + }, + risk_flags=[ + RiskFlag(Severity.CRITICAL, "GH_OIDC_ADMIN_ROLE", + "OIDC workflow assumes AWS role with AdministratorAccess", + "Critical: GitHub Actions workflow 'Deploy to Prod' uses OIDC to assume " + "a role with AdministratorAccess. A compromised workflow or malicious PR " + "could gain full AWS account control."), + RiskFlag(Severity.HIGH, "GH_OIDC_PR_TRIGGER", + "OIDC cloud auth triggered on pull_request events", + "High: any contributor can trigger this workflow via a PR, " + "obtaining AWS admin credentials. Restrict to push events only."), + ], + ), + Identity( + id="github:oidc:azure:infra.yml:11111111-aaaa-bbbb-cccc-000000000001", + name="OIDC → Azure (Infra Deploy)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": "infra.yml", + "workflow_name": "Infra Deploy", + "job_name": "terraform", + "cloud_provider": "azure", + "azure_client_id": "11111111-aaaa-bbbb-cccc-000000000001", + "azure_tenant_id": "tenant-001", + "trigger_events": ["push"], + "has_oidc_permission": True, + }, + risk_flags=[ + RiskFlag(Severity.HIGH, "GH_OIDC_AZURE_CONTRIBUTOR", + "OIDC workflow federates to Azure SP with Contributor at subscription", + "High: workflow 'Infra Deploy' assumes aks-cluster-sp which has " + "Contributor at subscription scope. Scope the SP role to a resource group."), + ], + ), + Identity( + id="github:oidc:gcp:ci.yml:ci-runner@my-project.iam.gserviceaccount.com", + name="OIDC → GCP (CI Pipeline)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "workflow_file": "ci.yml", + "workflow_name": "CI Pipeline", + "job_name": "build-and-push", + "cloud_provider": "gcp", + "gcp_service_account": "ci-runner@my-project.iam.gserviceaccount.com", + "gcp_wif_provider": "projects/123/locations/global/workloadIdentityPools/github/providers/my-repo", + "trigger_events": ["push", "pull_request"], + "has_oidc_permission": True, + }, + risk_flags=[ + RiskFlag(Severity.HIGH, "GH_OIDC_PR_TRIGGER", + "OIDC cloud auth triggered on pull_request events", + "High: PR authors can trigger GCP access via Workload Identity " + "Federation. Use environment protection rules or restrict triggers."), + ], + ), + ] + # ── Build combined result ────────────────────────────────────── all_ids = (aws_identities + azure_identities + gcp_identities - + github_identities + k8s_identities) + + github_identities + k8s_identities + oidc_identities) return ScanResult( identities=all_ids, - providers_scanned=["aws", "azure", "gcp", "github", "kubernetes"], + providers_scanned=["aws", "azure", "gcp", "github", "kubernetes", "github-actions"], scan_time=now, ) @@ -1237,27 +1389,47 @@ def main(): result = _build_demo_data() fmt = getattr(args, "format", "table") output_path = getattr(args, "output", None) - if fmt == "table" and not output_path: - _print_demo_table(result) - else: - _output_result(result, fmt, output_path) - # Attack path analysis for demo (--attack-paths or --mermaid) - wants_attack = getattr(args, "attack_paths", False) or getattr(args, "mermaid", False) - if wants_attack: + wants_ci = getattr(args, "ci_summary", False) + + # Determine ASCII-safe mode + from nhinsight.core.ci_summary import is_ci + ascii_safe = getattr(args, "ascii", False) or is_ci() + + if wants_ci: + # CI summary mode — compact markdown replaces normal output from nhinsight.analyzers.attack_paths import analyze_attack_paths + from nhinsight.core.ci_summary import print_ci_summary, write_github_step_summary ap_result = analyze_attack_paths(result.identities) out = sys.stdout if output_path: - out = open(output_path, "a") - if not getattr(args, "mermaid", False): - from nhinsight.core.output import print_attack_paths - print_attack_paths(ap_result, out=out) - if getattr(args, "mermaid", False): - from nhinsight.core.mermaid import render_attack_paths, render_summary_table - render_summary_table(ap_result, out=out) - render_attack_paths(ap_result, out=out) + out = open(output_path, "w") + print_ci_summary(result, ap_result, out=out, ascii_safe=ascii_safe) + write_github_step_summary(result, ap_result, ascii_safe=True) if output_path: out.close() + print(f"Results written to {output_path}") + else: + if fmt == "table" and not output_path: + _print_demo_table(result) + else: + _output_result(result, fmt, output_path) + # Attack path analysis for demo (--attack-paths or --mermaid) + wants_attack = getattr(args, "attack_paths", False) or getattr(args, "mermaid", False) + if wants_attack: + from nhinsight.analyzers.attack_paths import analyze_attack_paths + ap_result = analyze_attack_paths(result.identities) + out = sys.stdout + if output_path: + out = open(output_path, "a") + if not getattr(args, "mermaid", False): + from nhinsight.core.output import print_attack_paths + print_attack_paths(ap_result, out=out, ascii_safe=ascii_safe) + if getattr(args, "mermaid", False): + from nhinsight.core.mermaid import render_attack_paths, render_summary_table + render_summary_table(ap_result, out=out) + render_attack_paths(ap_result, out=out) + if output_path: + out.close() elif args.command == "graph": _run_graph(args) elif args.command == "report": diff --git a/nhinsight/core/ci_summary.py b/nhinsight/core/ci_summary.py new file mode 100644 index 0000000..587e8ba --- /dev/null +++ b/nhinsight/core/ci_summary.py @@ -0,0 +1,293 @@ +# MIT License — Copyright (c) 2026 cvemula1 +# Compact CI / PR-friendly summary renderer for NHInsight + +from __future__ import annotations + +import os +import sys +from typing import TextIO + +from nhinsight.core.models import Classification, ScanResult, Severity + +# ── ASCII-safe helpers ───────────────────────────────────────────────── + +def is_ci() -> bool: + """Detect if running inside a CI environment.""" + return any(os.environ.get(v) for v in ("CI", "GITHUB_ACTIONS", "GITLAB_CI", "JENKINS_URL")) + + +# Mapping: emoji → ASCII-safe fallback +_SEVERITY_ICON = { + Severity.CRITICAL: "🔴", + Severity.HIGH: "🟠", + Severity.MEDIUM: "🟡", + Severity.LOW: "🔵", + Severity.INFO: "🟢", +} + +_SEVERITY_ASCII = { + Severity.CRITICAL: "[CRITICAL]", + Severity.HIGH: "[HIGH]", + Severity.MEDIUM: "[MEDIUM]", + Severity.LOW: "[LOW]", + Severity.INFO: "[INFO]", +} + + +def sev_icon(sev: Severity, *, ascii_safe: bool = False) -> str: + """Return severity icon, ASCII-safe if requested.""" + if ascii_safe: + return _SEVERITY_ASCII.get(sev, sev.value.upper()) + return _SEVERITY_ICON.get(sev, sev.value.upper()) + + +def sev_badge(sev: Severity) -> str: + """Markdown-friendly severity badge for tables.""" + return f"**{sev.value.upper()}**" + + +# ── Compact CI summary ──────────────────────────────────────────────── + +def print_ci_summary( + result: ScanResult, + ap_result=None, + out: TextIO = sys.stdout, + *, + ascii_safe: bool = False, +) -> None: + """Print a compact markdown summary suitable for $GITHUB_STEP_SUMMARY or PR comments. + + Parameters + ---------- + result : ScanResult + The scan result to summarize. + ap_result : AttackPathResult | None + Optional attack path results to include. + out : TextIO + Output stream. + ascii_safe : bool + If True, avoid emoji/unicode that breaks in some CI terminals. + """ + nhis = [i for i in result.identities if i.classification != Classification.HUMAN] + humans = [i for i in result.identities if i.classification == Classification.HUMAN] + + crit = sum(1 for i in nhis if i.highest_severity == Severity.CRITICAL) + high = sum(1 for i in nhis if i.highest_severity == Severity.HIGH) + med = sum(1 for i in nhis if i.highest_severity == Severity.MEDIUM) + low = sum(1 for i in nhis if i.highest_severity == Severity.LOW) + ok = sum(1 for i in nhis if i.highest_severity == Severity.INFO) + + # ── Header + out.write("## NHInsight Scan Summary\n\n") + + if result.providers_scanned: + out.write(f"**Providers:** {', '.join(result.providers_scanned)} \n") + scanned_str = f"**Identities scanned:** {len(nhis)} NHIs" + if humans: + scanned_str += f" + {len(humans)} humans" + out.write(f"{scanned_str} \n\n") + + # ── Severity table + out.write("| Severity | Count |\n") + out.write("|----------|-------|\n") + out.write(f"| {_sev_cell('CRITICAL', crit, ascii_safe)} | {crit} |\n") + out.write(f"| {_sev_cell('HIGH', high, ascii_safe)} | {high} |\n") + out.write(f"| {_sev_cell('MEDIUM', med, ascii_safe)} | {med} |\n") + out.write(f"| {_sev_cell('LOW', low, ascii_safe)} | {low} |\n") + out.write(f"| {_sev_cell('Healthy', ok, ascii_safe)} | {ok} |\n\n") + + # ── Top findings (critical + high, limit 8) + top_findings = [] + for ident in nhis: + for flag in ident.risk_flags: + if flag.severity in (Severity.CRITICAL, Severity.HIGH): + top_findings.append((flag.severity, ident, flag)) + top_findings.sort(key=lambda x: (0 if x[0] == Severity.CRITICAL else 1)) + top_findings = top_findings[:8] + + if top_findings: + out.write("### Top Findings\n\n") + out.write("| Identity | Provider | Severity | Issue |\n") + out.write("|----------|----------|----------|-------|\n") + for sev, ident, flag in top_findings: + out.write( + f"| `{ident.name}` " + f"| {ident.provider.value} " + f"| {sev.value.upper()} " + f"| {flag.message} |\n" + ) + out.write("\n") + + # ── Privilege escalation paths + if ap_result and ap_result.paths: + _write_path_section(ap_result, out, ascii_safe=ascii_safe) + + # ── Immediate actions + actions = _build_actions(nhis, ap_result) + if actions: + out.write("### Immediate Actions\n\n") + for i, action in enumerate(actions[:5], 1): + out.write(f"{i}. {action}\n") + out.write("\n") + + out.write("---\n") + out.write("*Generated by [NHInsight](https://github.com/cvemula1/NHInsight)*\n") + + +def _sev_cell(label: str, count: int, ascii_safe: bool) -> str: + """Format a severity cell, bold if count > 0.""" + if count > 0: + return f"**{label}**" + return label + + +def _write_path_section(ap_result, out: TextIO, *, ascii_safe: bool = False) -> None: + """Write privilege escalation paths in reviewer-friendly format.""" + paths = ap_result.paths + out.write("### Privilege Escalation Paths\n\n") + + cross_sym = "(cross-system)" if ascii_safe else "⚡" + + out.write("| Path | Severity | Risk | Entry → Target | Fix |\n") + out.write("|------|----------|------|----------------|-----|\n") + + for path in paths[:10]: + sev = path.severity.value.upper() + risk = f"{path.blast_radius:.0f}/100" + entry = path.steps[0].node_label if path.steps else "?" + target = path.steps[-1].node_label if path.steps else "?" + cross = f" {cross_sym}" if path.cross_system else "" + rec = (path.recommendation or "Review permissions.")[:80] + out.write(f"| {path.id} | {sev} | {risk} | `{entry}` → `{target}`{cross} | {rec} |\n") + + if len(paths) > 10: + out.write(f"\n*...and {len(paths) - 10} more paths*\n") + out.write("\n") + + # Detailed reviewer-friendly blocks for critical paths (limit 3) + crit_paths = [p for p in paths if p.severity == Severity.CRITICAL][:3] + if crit_paths: + out.write("
\nCritical path details\n\n") + for path in crit_paths: + _write_path_detail(path, out, ascii_safe=ascii_safe) + out.write("
\n\n") + + +def _write_path_detail(path, out: TextIO, *, ascii_safe: bool = False) -> None: + """Write a single path in reviewer-friendly format.""" + sev = path.severity.value.upper() + entry = path.steps[0].node_label if path.steps else "?" + target = path.steps[-1].node_label if path.steps else "?" + + out.write(f"**{sev}** — `{entry}` can reach `{target}`\n\n") + + # Path chain + chain = " → ".join(f"`{s.node_label}`" for s in path.steps) + out.write(f"**Path:** {chain}\n\n") + + # Why it matters + why = _why_it_matters(path) + out.write(f"**Why it matters:** {why}\n\n") + + # Fix + out.write(f"**Fix:** {path.recommendation or 'Review and reduce permissions along this path.'}\n\n") + + out.write("---\n\n") + + +def _why_it_matters(path) -> str: + """Generate a plain-English 'why it matters' sentence for a path.""" + entry = path.steps[0] if path.steps else None + target = path.steps[-1] if path.steps else None + + if not entry or not target: + return "This path reaches a privileged resource." + + parts = [] + + # Cross-system warning + if path.cross_system: + providers = path.providers_involved + parts.append(f"This path crosses system boundaries ({' → '.join(providers)})") + + # Entry point type + entry_type = entry.node_type + if "key" in entry_type or "secret" in entry_type: + parts.append("A leaked credential could let an attacker traverse this entire chain") + elif "service_account" in entry_type: + parts.append("A compromised workload using this service account could escalate privileges") + else: + parts.append("Compromise of the entry point could lead to privilege escalation") + + # Target privilege + target_label = target.node_label.lower() + if "admin" in target_label or "owner" in target_label: + parts.append("reaching full administrative control") + elif "contributor" in target_label or "editor" in target_label: + parts.append("reaching broad resource modification permissions") + elif "cluster-admin" in target_label: + parts.append("reaching full Kubernetes cluster control") + else: + parts.append("reaching a privileged target") + + # Risk score context + if path.blast_radius >= 80: + parts.append(f"Risk score {path.blast_radius:.0f}/100 — immediate remediation recommended") + elif path.blast_radius >= 60: + parts.append(f"Risk score {path.blast_radius:.0f}/100 — high-priority fix") + + return ". ".join(parts) + "." + + +def _build_actions(nhis, ap_result) -> list[str]: + """Build a prioritized list of immediate actions.""" + actions = [] + + # From findings + for ident in nhis: + for flag in ident.risk_flags: + if flag.severity == Severity.CRITICAL: + detail = flag.detail or flag.message + # Strip severity prefix + for prefix in ("Critical: ", "High: ", "Medium: "): + if detail.startswith(prefix): + detail = detail[len(prefix):] + break + action = detail.split(". ")[0] + if action and action[0].islower(): + action = action[0].upper() + action[1:] + actions.append(f"**{ident.name}** — {action}") + + # From attack paths + if ap_result: + for path in ap_result.paths: + if path.severity == Severity.CRITICAL and path.recommendation: + entry = path.steps[0].node_label if path.steps else "?" + rec = path.recommendation.split(". ")[0] + action_str = f"**{path.id}** ({entry}) — {rec}" + if action_str not in actions: + actions.append(action_str) + + return actions[:5] + + +# ── GitHub Step Summary helper ───────────────────────────────────────── + +def write_github_step_summary( + result: ScanResult, + ap_result=None, + *, + ascii_safe: bool = True, +) -> bool: + """Write compact summary to $GITHUB_STEP_SUMMARY if available. + + Returns True if written, False if not in GitHub Actions. + """ + summary_path = os.environ.get("GITHUB_STEP_SUMMARY") + if not summary_path: + return False + + with open(summary_path, "a") as f: + print_ci_summary(result, ap_result, out=f, ascii_safe=ascii_safe) + + return True diff --git a/nhinsight/core/models.py b/nhinsight/core/models.py index 825f143..083e007 100644 --- a/nhinsight/core/models.py +++ b/nhinsight/core/models.py @@ -27,6 +27,7 @@ class IdentityType(str, Enum): AZURE_APP_CERT = "azure_app_cert" GCP_SERVICE_ACCOUNT = "gcp_service_account" GCP_SA_KEY = "gcp_sa_key" + GITHUB_ACTIONS_OIDC = "github_actions_oidc" UNKNOWN = "unknown" diff --git a/nhinsight/core/output.py b/nhinsight/core/output.py index ec18cad..f8ce61c 100644 --- a/nhinsight/core/output.py +++ b/nhinsight/core/output.py @@ -18,6 +18,15 @@ DIM = "\033[2m" RESET = "\033[0m" +# ASCII-safe fallback icons (no emoji, stable in CI logs) +SEVERITY_ICONS_ASCII = { + Severity.CRITICAL: "[CRITICAL]", + Severity.HIGH: "[HIGH]", + Severity.MEDIUM: "[MEDIUM]", + Severity.LOW: "[LOW]", + Severity.INFO: "[INFO]", +} + SEVERITY_COLORS = { Severity.CRITICAL: RED, Severity.HIGH: RED, @@ -28,17 +37,17 @@ SEVERITY_ICONS = { Severity.CRITICAL: "🔴", - Severity.HIGH: "�", + Severity.HIGH: "🟠", Severity.MEDIUM: "🟡", Severity.LOW: "🔵", Severity.INFO: "🟢", } -def _print_identity_group(identities, sev, out): +def _print_identity_group(identities, sev, out, *, ascii_safe=False): """Print a group of identities at a given severity level.""" color = SEVERITY_COLORS[sev] - icon = SEVERITY_ICONS[sev] + icon = SEVERITY_ICONS_ASCII[sev] if ascii_safe else SEVERITY_ICONS[sev] label = sev.value.upper() out.write(f" {color}{icon} {label} ({len(identities)}){RESET}\n") @@ -53,7 +62,7 @@ def _print_identity_group(identities, sev, out): out.write("\n") -def print_table(result: ScanResult, out: TextIO = sys.stdout) -> None: +def print_table(result: ScanResult, out: TextIO = sys.stdout, *, ascii_safe: bool = False) -> None: """Print scan results as a formatted terminal table.""" from nhinsight.core.models import Classification @@ -79,7 +88,7 @@ def print_table(result: ScanResult, out: TextIO = sys.stdout) -> None: for sev in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW, Severity.INFO]: group = [i for i in nhis if i.highest_severity == sev] if group: - _print_identity_group(group, sev, out) + _print_identity_group(group, sev, out, ascii_safe=ascii_safe) # Humans in a separate section (if any have risk flags) risky_humans = [h for h in humans if h.risk_flags] @@ -90,7 +99,7 @@ def print_table(result: ScanResult, out: TextIO = sys.stdout) -> None: for sev in [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW]: group = [h for h in risky_humans if h.highest_severity == sev] if group: - _print_identity_group(group, sev, out) + _print_identity_group(group, sev, out, ascii_safe=ascii_safe) if safe_humans: out.write(f" {GREEN}🟢 HEALTHY ({len(safe_humans)}){RESET}\n") for h in safe_humans: @@ -280,7 +289,7 @@ def _md_identity_block(ident, out): out.write("- No risk flags\n\n") -def print_markdown(result: ScanResult, out: TextIO = sys.stdout) -> None: +def print_markdown(result: ScanResult, out: TextIO = sys.stdout, *, ascii_safe: bool = False) -> None: """Print scan results as a Markdown report.""" from nhinsight.core.models import Classification @@ -362,7 +371,7 @@ def _get_urgent_fixes(result: ScanResult, limit: int = 5) -> list: return [f"**{name}** — {detail}" for _, name, _, detail in urgent[:limit]] -def print_attack_paths(ap_result, out: TextIO = sys.stdout) -> None: +def print_attack_paths(ap_result, out: TextIO = sys.stdout, *, ascii_safe: bool = False) -> None: """Print attack path analysis results.""" paths = ap_result.paths stats = ap_result.graph_stats @@ -393,14 +402,16 @@ def print_attack_paths(ap_result, out: TextIO = sys.stdout) -> None: for i, path in enumerate(paths[:15]): sev = path.severity color = SEVERITY_COLORS.get(sev, RESET) - icon = SEVERITY_ICONS.get(sev, "⚪") + icons = SEVERITY_ICONS_ASCII if ascii_safe else SEVERITY_ICONS + icon = icons.get(sev, "[?]" if ascii_safe else "⚪") out.write(f" {color}{icon} {path.id} — {path.description}{RESET}") out.write(f" {BOLD}{sev.value.upper()}{RESET}") blast_str = f" risk: {path.blast_radius:.0f}/100" out.write(f" {DIM}{blast_str}{RESET}") if path.cross_system: - out.write(f" {CYAN}⚡ cross-system{RESET}") + cross_sym = "(cross-system)" if ascii_safe else "⚡ cross-system" + out.write(f" {CYAN}{cross_sym}{RESET}") out.write("\n") # Steps @@ -420,7 +431,8 @@ def print_attack_paths(ap_result, out: TextIO = sys.stdout) -> None: # Recommendation if path.recommendation: rec = path.recommendation[:100] - out.write(f" {DIM} 💡 {rec}{RESET}\n") + tip = "Tip:" if ascii_safe else "💡" + out.write(f" {DIM} {tip} {rec}{RESET}\n") out.write("\n") @@ -431,13 +443,13 @@ def print_attack_paths(ap_result, out: TextIO = sys.stdout) -> None: out.write(f" {'─' * 56}\n\n") -def print_result(result: ScanResult, fmt: str = "table", out: TextIO = sys.stdout) -> None: +def print_result(result: ScanResult, fmt: str = "table", out: TextIO = sys.stdout, *, ascii_safe: bool = False) -> None: """Print scan results in the requested format.""" if fmt == "json": print_json(result, out) elif fmt == "sarif": print_sarif(result, out) elif fmt == "markdown" or fmt == "md": - print_markdown(result, out) + print_markdown(result, out, ascii_safe=ascii_safe) else: - print_table(result, out) + print_table(result, out, ascii_safe=ascii_safe) diff --git a/tests/test_ci_summary.py b/tests/test_ci_summary.py new file mode 100644 index 0000000..b6e31b2 --- /dev/null +++ b/tests/test_ci_summary.py @@ -0,0 +1,454 @@ +# MIT License — Copyright (c) 2026 cvemula1 +# Tests for compact CI summary, ASCII-safe output, and improved path wording + +from __future__ import annotations + +import io +import subprocess +import sys +from datetime import datetime, timedelta, timezone + +from nhinsight.analyzers.attack_paths import ( + AttackPath, + AttackPathResult, + AttackPathStep, +) +from nhinsight.core.ci_summary import ( + _build_actions, + _why_it_matters, + is_ci, + print_ci_summary, + sev_badge, + sev_icon, +) +from nhinsight.core.models import ( + Classification, + Identity, + IdentityType, + Provider, + RiskFlag, + ScanResult, + Severity, +) +from nhinsight.core.output import ( + SEVERITY_ICONS, + SEVERITY_ICONS_ASCII, + print_attack_paths, + print_result, + print_table, +) + +# ── Helpers ──────────────────────────────────────────────────────────── + +def _make_demo_result() -> ScanResult: + """Build a small but realistic ScanResult for testing.""" + now = datetime.now(timezone.utc) + return ScanResult( + identities=[ + Identity( + id="aws:iam:user:123:deploy-bot", + name="deploy-bot", + provider=Provider.AWS, + identity_type=IdentityType.IAM_USER, + classification=Classification.MACHINE, + created_at=now - timedelta(days=400), + policies=["AdministratorAccess"], + risk_flags=[ + RiskFlag(Severity.CRITICAL, "AWS_ADMIN_ACCESS", + "Has AdministratorAccess policy attached", + "Critical: machine identity with full AWS access."), + ], + ), + Identity( + id="aws:iam:role:123:escape-hatch", + name="escape-hatch", + provider=Provider.AWS, + identity_type=IdentityType.IAM_ROLE, + classification=Classification.MACHINE, + created_at=now - timedelta(days=700), + policies=["AdministratorAccess"], + risk_flags=[ + RiskFlag(Severity.CRITICAL, "AWS_WILDCARD_TRUST", + "Role trust allows any AWS principal (*)", + "Critical: any AWS account worldwide can assume this role."), + ], + ), + Identity( + id="k8s:sa:prod:default:default", + name="default/default", + provider=Provider.KUBERNETES, + identity_type=IdentityType.SERVICE_ACCOUNT, + classification=Classification.MACHINE, + created_at=now - timedelta(days=300), + risk_flags=[ + RiskFlag(Severity.MEDIUM, "K8S_DEFAULT_SA", + "Using default ServiceAccount", + "Medium: 3 workloads share the default SA."), + ], + ), + Identity( + id="github:app:acme:renovate", + name="renovate", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_APP, + classification=Classification.MACHINE, + created_at=now - timedelta(days=100), + risk_flags=[], + ), + Identity( + id="aws:iam:user:123:alice", + name="alice", + provider=Provider.AWS, + identity_type=IdentityType.IAM_USER, + classification=Classification.HUMAN, + created_at=now - timedelta(days=200), + risk_flags=[], + ), + ], + providers_scanned=["aws", "kubernetes", "github"], + scan_time=now, + ) + + +def _make_ap_result() -> AttackPathResult: + """Build a small AttackPathResult for testing.""" + return AttackPathResult( + paths=[ + AttackPath( + id="AP-001", + steps=[ + AttackPathStep( + node_id="deploy-bot", + node_label="deploy-bot", + node_type="iam_user", + provider="aws", + ), + AttackPathStep( + node_id="admin-role", + node_label="AdministratorAccess", + node_type="iam_role", + provider="aws", + edge_type="assumes_role", + edge_label="assumes", + ), + ], + severity=Severity.CRITICAL, + blast_radius=85.0, + cross_system=False, + description="deploy-bot can reach AdministratorAccess", + recommendation="Tighten the role trust policy. Use condition keys.", + ), + AttackPath( + id="AP-002", + steps=[ + AttackPathStep( + node_id="k8s-sa", + node_label="payments/checkout-svc", + node_type="service_account", + provider="kubernetes", + ), + AttackPathStep( + node_id="aws-role", + node_label="checkout-role", + node_type="iam_role", + provider="aws", + edge_type="irsa_maps_to", + edge_label="IRSA", + ), + ], + severity=Severity.HIGH, + blast_radius=55.0, + cross_system=True, + description="checkout-svc can reach checkout-role (crosses kubernetes → aws)", + recommendation="Scope the IRSA role to least-privilege.", + ), + ], + graph_stats={"nodes": 10, "edges": 8, "entry_points": 3, "privileged_nodes": 2}, + ) + + +# ── CI Summary tests ────────────────────────────────────────────────── + +class TestCISummary: + def test_compact_summary_contains_header(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "## NHInsight Scan Summary" in output + + def test_compact_summary_contains_providers(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "aws" in output + assert "kubernetes" in output + + def test_compact_summary_contains_severity_table(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "| Severity | Count |" in output + assert "**CRITICAL**" in output + + def test_compact_summary_contains_top_findings(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "### Top Findings" in output + assert "`deploy-bot`" in output + assert "AdministratorAccess" in output + + def test_compact_summary_with_attack_paths(self): + buf = io.StringIO() + result = _make_demo_result() + ap = _make_ap_result() + print_ci_summary(result, ap, out=buf) + output = buf.getvalue() + assert "### Privilege Escalation Paths" in output + assert "AP-001" in output + assert "AP-002" in output + + def test_compact_summary_contains_immediate_actions(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "### Immediate Actions" in output + assert "**deploy-bot**" in output + + def test_compact_summary_footer(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + assert "*Generated by [NHInsight]" in output + + def test_compact_summary_critical_path_details(self): + buf = io.StringIO() + result = _make_demo_result() + ap = _make_ap_result() + print_ci_summary(result, ap, out=buf) + output = buf.getvalue() + assert "
" in output + assert "Critical path details" in output + assert "**Why it matters:**" in output + assert "**Fix:**" in output + + def test_identity_counts_correct(self): + buf = io.StringIO() + result = _make_demo_result() + print_ci_summary(result, None, out=buf) + output = buf.getvalue() + # 4 NHIs + 1 human + assert "4 NHIs" in output + assert "1 humans" in output + + def test_ascii_safe_summary(self): + buf = io.StringIO() + result = _make_demo_result() + ap = _make_ap_result() + print_ci_summary(result, ap, out=buf, ascii_safe=True) + output = buf.getvalue() + # Should not contain any emoji + assert "🔴" not in output + assert "🟠" not in output + assert "⚡" not in output + # Should still have all key sections + assert "## NHInsight Scan Summary" in output + assert "AP-001" in output + + +# ── ASCII-safe output tests ─────────────────────────────────────────── + +class TestASCIISafe: + def test_severity_icons_ascii_all_present(self): + for sev in Severity: + assert sev in SEVERITY_ICONS_ASCII + icon = SEVERITY_ICONS_ASCII[sev] + assert icon.startswith("[") and icon.endswith("]") + # Should be pure ASCII + assert all(ord(c) < 128 for c in icon) + + def test_severity_icons_emoji_present(self): + for sev in Severity: + assert sev in SEVERITY_ICONS + # Should contain non-ASCII (emoji) + icon = SEVERITY_ICONS[sev] + assert any(ord(c) > 127 for c in icon) + + def test_sev_icon_function(self): + assert sev_icon(Severity.CRITICAL, ascii_safe=False) == "🔴" + assert sev_icon(Severity.CRITICAL, ascii_safe=True) == "[CRITICAL]" + assert sev_icon(Severity.HIGH, ascii_safe=True) == "[HIGH]" + + def test_sev_badge(self): + assert sev_badge(Severity.CRITICAL) == "**CRITICAL**" + assert sev_badge(Severity.LOW) == "**LOW**" + + def test_print_table_ascii_safe(self): + buf = io.StringIO() + result = _make_demo_result() + print_table(result, out=buf, ascii_safe=True) + output = buf.getvalue() + # Should use ASCII icons + assert "[CRITICAL]" in output + # Should NOT contain emoji + assert "🔴" not in output + + def test_print_attack_paths_ascii_safe(self): + buf = io.StringIO() + ap = _make_ap_result() + print_attack_paths(ap, out=buf, ascii_safe=True) + output = buf.getvalue() + # ASCII icons + assert "[CRITICAL]" in output or "[HIGH]" in output + # No emoji + assert "🔴" not in output + assert "🟠" not in output + assert "⚡" not in output + # Cross-system uses text + assert "(cross-system)" in output + + def test_print_attack_paths_ascii_safe_tip(self): + buf = io.StringIO() + ap = _make_ap_result() + print_attack_paths(ap, out=buf, ascii_safe=True) + output = buf.getvalue() + assert "Tip:" in output + assert "💡" not in output + + def test_print_result_ascii_safe(self): + buf = io.StringIO() + result = _make_demo_result() + print_result(result, fmt="table", out=buf, ascii_safe=True) + output = buf.getvalue() + assert "[CRITICAL]" in output + + def test_is_ci_default(self): + # In a test environment, CI might or might not be set + # Just verify it returns a bool + assert isinstance(is_ci(), bool) + + +# ── Improved path wording tests ─────────────────────────────────────── + +class TestPathWording: + def test_why_it_matters_cross_system(self): + path = _make_ap_result().paths[1] # cross-system path + why = _why_it_matters(path) + assert "crosses system boundaries" in why + assert "kubernetes" in why or "aws" in why + + def test_why_it_matters_credential_entry(self): + path = AttackPath( + id="AP-TEST", + steps=[ + AttackPathStep("key-1", "deploy-key", "access_key", "aws"), + AttackPathStep("role-1", "AdministratorAccess", "iam_role", "aws", + edge_type="assumes_role", edge_label="assumes"), + ], + severity=Severity.CRITICAL, + blast_radius=90.0, + description="deploy-key can reach AdministratorAccess", + ) + why = _why_it_matters(path) + assert "leaked credential" in why.lower() or "credential" in why.lower() + + def test_why_it_matters_service_account_entry(self): + path = AttackPath( + id="AP-TEST", + steps=[ + AttackPathStep("sa-1", "checkout-svc", "service_account", "kubernetes"), + AttackPathStep("role-1", "cluster-admin", "rbac", "kubernetes", + edge_type="bound_to_rbac", edge_label="bound"), + ], + severity=Severity.CRITICAL, + blast_radius=85.0, + description="checkout-svc can reach cluster-admin", + ) + why = _why_it_matters(path) + assert "service account" in why.lower() + + def test_why_it_matters_admin_target(self): + path = _make_ap_result().paths[0] + why = _why_it_matters(path) + assert "admin" in why.lower() + + def test_why_it_matters_high_risk_score(self): + path = _make_ap_result().paths[0] # blast_radius=85 + why = _why_it_matters(path) + assert "85/100" in why + + def test_build_actions_prioritizes_critical(self): + result = _make_demo_result() + nhis = [i for i in result.identities if i.classification != Classification.HUMAN] + actions = _build_actions(nhis, None) + assert len(actions) > 0 + assert "deploy-bot" in actions[0] or "escape-hatch" in actions[0] + + def test_build_actions_includes_path_recs(self): + result = _make_demo_result() + nhis = [i for i in result.identities if i.classification != Classification.HUMAN] + ap = _make_ap_result() + actions = _build_actions(nhis, ap) + # Should include both finding-based and path-based actions + assert any("AP-" in a for a in actions) + + +# ── CLI integration tests ───────────────────────────────────────────── + +class TestCLICISummary: + def test_demo_ci_summary_runs(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--ci-summary"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "## NHInsight Scan Summary" in result.stdout + assert "### Top Findings" in result.stdout + assert "### Privilege Escalation Paths" in result.stdout + + def test_demo_ci_summary_ascii(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--ci-summary", "--ascii"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "## NHInsight Scan Summary" in result.stdout + # No emoji in output + assert "🔴" not in result.stdout + assert "⚡" not in result.stdout + + def test_demo_ascii_table(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--attack-paths", "--ascii"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + # Attack path section should use ASCII icons + assert "[CRITICAL]" in result.stdout or "[HIGH]" in result.stdout + + def test_demo_ci_summary_has_path_details(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--ci-summary"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "Immediate Actions" in result.stdout + assert "Generated by" in result.stdout + + def test_demo_ci_summary_to_file(self, tmp_path): + outfile = tmp_path / "summary.md" + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--ci-summary", + "-o", str(outfile)], + capture_output=True, text=True, + ) + assert result.returncode == 0 + content = outfile.read_text() + assert "## NHInsight Scan Summary" in content + assert "### Top Findings" in content diff --git a/tests/test_workflow_scanner.py b/tests/test_workflow_scanner.py new file mode 100644 index 0000000..8d9aad9 --- /dev/null +++ b/tests/test_workflow_scanner.py @@ -0,0 +1,701 @@ +# MIT License — Copyright (c) 2026 cvemula1 +# Tests for GitHub Actions workflow scanner and OIDC attack path integration + +from __future__ import annotations + +import subprocess +import sys +import textwrap +from datetime import datetime, timezone + +from nhinsight.analyzers.attack_paths import analyze_attack_paths +from nhinsight.analyzers.graph import EdgeType, build_graph +from nhinsight.analyzers.workflow_scanner import ( + WorkflowOIDCConnection, + _connection_to_identities, + _extract_triggers, + _find_job_name, + _parse_workflow, + scan_workflows, +) +from nhinsight.core.models import ( + Classification, + Identity, + IdentityType, + Provider, + RiskFlag, + Severity, +) + +# ── Sample workflow content ──────────────────────────────────────────── + +AWS_OIDC_WORKFLOW = textwrap.dedent("""\ + name: Deploy to AWS + on: + push: + branches: [main] + pull_request: + + permissions: + id-token: write + contents: read + + jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::123456789012:role/github-deploy-role + aws-region: us-east-1 + - run: aws s3 sync ./dist s3://my-bucket +""") + +AZURE_OIDC_WORKFLOW = textwrap.dedent("""\ + name: Deploy to Azure + on: [push] + + permissions: + id-token: write + + jobs: + terraform: + runs-on: ubuntu-latest + steps: + - uses: azure/login@v1 + with: + client-id: 11111111-aaaa-bbbb-cccc-000000000001 + tenant-id: 22222222-dddd-eeee-ffff-000000000002 + subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} +""") + +GCP_OIDC_WORKFLOW = textwrap.dedent("""\ + name: CI Pipeline + on: [push, pull_request] + + permissions: + id-token: write + + jobs: + build-and-push: + runs-on: ubuntu-latest + steps: + - uses: google-github-actions/auth@v2 + with: + workload_identity_provider: projects/123/locations/global/workloadIdentityPools/github/providers/my-repo + service_account: ci-runner@my-project.iam.gserviceaccount.com +""") + +NO_OIDC_WORKFLOW = textwrap.dedent("""\ + name: Lint + on: [push] + + jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: npm run lint +""") + +MULTI_CLOUD_WORKFLOW = textwrap.dedent("""\ + name: Multi-Cloud Deploy + on: + push: + branches: [main] + + permissions: + id-token: write + contents: read + + jobs: + deploy-aws: + runs-on: ubuntu-latest + steps: + - uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::111111111111:role/deploy-admin + aws-region: us-west-2 + + deploy-gcp: + runs-on: ubuntu-latest + steps: + - uses: google-github-actions/auth@v2 + with: + workload_identity_provider: projects/456/locations/global/workloadIdentityPools/gh/providers/repo + service_account: deployer@prod.iam.gserviceaccount.com +""") + +MANAGED_IDENTITY_WORKFLOW = textwrap.dedent("""\ + name: Deploy with MI + on: + push: + branches: [main] + pull_request: + branches: [main] + + jobs: + deploy: + runs-on: my-custom-runner + steps: + - name: Azure Login + run: az login --identity --allow-no-subscriptions + - name: Get secrets + run: | + TENANT=$(az keyvault secret show --vault-name my-kv --name tenant-id --query value -o tsv) + SUB=$(az keyvault secret show --name sub-id --vault-name my-kv --query value -o tsv) + - name: Get AKS creds + run: az aks get-credentials --resource-group rg --name my-cluster +""") + +KV_ENV_REF_WORKFLOW = textwrap.dedent("""\ + name: KV Env Test + on: push + + env: + BACKEND_VAULT_NAME: seaionl-prod-kv + + jobs: + deploy: + runs-on: [self-hosted] + steps: + - run: az login --identity + - run: | + T=$(az keyvault secret show --vault-name ${{ env.BACKEND_VAULT_NAME }} \ + --name tenant-id --query value -o tsv) + S=$(az keyvault secret show --vault-name ${{ env.BACKEND_VAULT_NAME }} \ + --name sub-id --query value -o tsv) +""") + +COMMENTED_OIDC_WORKFLOW = textwrap.dedent("""\ + name: No OIDC + on: push + # permissions: + # id-token: write + jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::123:role/test +""") + +WRITE_ALL_WORKFLOW = textwrap.dedent("""\ + name: AWS Deploy + on: push + permissions: write-all + jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::123456789012:role/deploy-admin + aws-region: us-east-1 +""") + +REUSABLE_WORKFLOW = textwrap.dedent("""\ + name: Reusable Deploy + on: + workflow_call: + inputs: + environment: + required: true + type: string + jobs: + deploy: + runs-on: [self-hosted, linux, azure] + steps: + - run: az login --identity +""") + + +# ── Parser tests ─────────────────────────────────────────────────────── + +class TestParseWorkflow: + def test_parse_aws_oidc(self): + conns = _parse_workflow(AWS_OIDC_WORKFLOW, "deploy.yml", "acme/app") + assert len(conns) == 1 + assert conns[0].cloud_provider == "aws" + assert conns[0].role_arn == "arn:aws:iam::123456789012:role/github-deploy-role" + assert conns[0].has_oidc_permission is True + assert conns[0].workflow_name == "Deploy to AWS" + + def test_parse_azure_oidc(self): + conns = _parse_workflow(AZURE_OIDC_WORKFLOW, "infra.yml", "acme/app") + assert len(conns) == 1 + assert conns[0].cloud_provider == "azure" + assert conns[0].azure_client_id == "11111111-aaaa-bbbb-cccc-000000000001" + assert conns[0].azure_tenant_id == "22222222-dddd-eeee-ffff-000000000002" + + def test_parse_gcp_oidc(self): + conns = _parse_workflow(GCP_OIDC_WORKFLOW, "ci.yml", "acme/app") + assert len(conns) == 1 + assert conns[0].cloud_provider == "gcp" + assert conns[0].gcp_service_account == "ci-runner@my-project.iam.gserviceaccount.com" + assert "workloadIdentityPools" in conns[0].gcp_wif_provider + + def test_parse_no_oidc(self): + conns = _parse_workflow(NO_OIDC_WORKFLOW, "lint.yml", "acme/app") + assert len(conns) == 0 + + def test_parse_multi_cloud(self): + conns = _parse_workflow(MULTI_CLOUD_WORKFLOW, "multi.yml", "acme/app") + assert len(conns) == 2 + providers = {c.cloud_provider for c in conns} + assert "aws" in providers + assert "gcp" in providers + + def test_job_name_detected(self): + conns = _parse_workflow(AWS_OIDC_WORKFLOW, "deploy.yml", "acme/app") + assert conns[0].job_name == "deploy" + + def test_triggers_parsed(self): + conns = _parse_workflow(AWS_OIDC_WORKFLOW, "deploy.yml", "acme/app") + assert "push" in conns[0].trigger_events or "pull_request" in conns[0].trigger_events + + def test_parse_managed_identity(self): + conns = _parse_workflow(MANAGED_IDENTITY_WORKFLOW, "deploy-mi.yml", "acme/app") + assert len(conns) == 1 + c = conns[0] + assert c.auth_method == "managed_identity" + assert c.cloud_provider == "azure" + assert c.self_hosted_runner == "my-custom-runner" + assert c.keyvault_name == "my-kv" + assert "tenant-id" in c.keyvault_secrets + assert "sub-id" in c.keyvault_secrets + assert c.has_aks_access is True + assert "push" in c.trigger_events + assert "pull_request" in c.trigger_events + + def test_parse_kv_env_ref(self): + conns = _parse_workflow(KV_ENV_REF_WORKFLOW, "kv-env.yml", "acme/app") + assert len(conns) == 1 + c = conns[0] + assert c.keyvault_name == "seaionl-prod-kv" + assert "tenant-id" in c.keyvault_secrets + assert "sub-id" in c.keyvault_secrets + + def test_commented_oidc_not_detected(self): + conns = _parse_workflow(COMMENTED_OIDC_WORKFLOW, "no-oidc.yml", "acme/app") + assert len(conns) == 1 + assert conns[0].has_oidc_permission is False + + def test_write_all_detected_as_oidc(self): + conns = _parse_workflow(WRITE_ALL_WORKFLOW, "wa.yml", "acme/app") + assert len(conns) == 1 + assert conns[0].has_oidc_permission is True + + def test_reusable_workflow_call(self): + conns = _parse_workflow(REUSABLE_WORKFLOW, "reusable.yml", "acme/app") + assert len(conns) == 1 + assert "workflow_call" in conns[0].trigger_events + assert conns[0].auth_method == "managed_identity" + assert "self-hosted" in conns[0].self_hosted_runner + + def test_self_hosted_string_format(self): + wf = "name: T\non: push\njobs:\n j:\n runs-on: my-runner\n steps:\n - run: az login --identity\n" + conns = _parse_workflow(wf, "t.yml", "r") + assert len(conns) == 1 + assert conns[0].self_hosted_runner == "my-runner" + + def test_kv_reversed_arg_order(self): + wf = ( + "name: T\non: push\njobs:\n j:\n runs-on: ubuntu-latest\n" + " steps:\n - run: az login --identity\n" + " - run: az keyvault secret show --name my-secret --vault-name my-vault\n" + ) + conns = _parse_workflow(wf, "t.yml", "r") + assert len(conns) == 1 + assert conns[0].keyvault_name == "my-vault" + assert "my-secret" in conns[0].keyvault_secrets + + +class TestExtractTriggers: + def test_inline_single(self): + content = "on: push\n" + assert _extract_triggers(content) == ["push"] + + def test_inline_list(self): + content = "on: [push, pull_request]\n" + triggers = _extract_triggers(content) + assert "push" in triggers + assert "pull_request" in triggers + + def test_multiline(self): + content = "on:\n push:\n pull_request:\njobs:\n" + triggers = _extract_triggers(content) + assert "push" in triggers + assert "pull_request" in triggers + + def test_quoted_inline(self): + content = "on: 'push'\njobs:\n" + assert _extract_triggers(content) == ["push"] + + def test_workflow_dispatch(self): + content = "on:\n workflow_dispatch:\njobs:\n" + assert _extract_triggers(content) == ["workflow_dispatch"] + + def test_workflow_call(self): + content = "on:\n workflow_call:\n inputs:\n env:\n type: string\njobs:\n" + assert _extract_triggers(content) == ["workflow_call"] + + def test_schedule_and_push(self): + content = "on:\n schedule:\n - cron: '0 0 * * *'\n push:\n branches: [main]\njobs:\n" + triggers = _extract_triggers(content) + assert "schedule" in triggers + assert "push" in triggers + + def test_subkeys_not_included(self): + content = "on:\n push:\n branches: [main]\n tags:\n - 'v*'\n paths:\n - 'src/**'\njobs:\n" + triggers = _extract_triggers(content) + assert triggers == ["push"] + assert "branches" not in triggers + assert "tags" not in triggers + assert "paths" not in triggers + + +class TestFindJobName: + def test_finds_job(self): + content = "jobs:\n deploy:\n runs-on: ubuntu\n steps:\n - uses: aws" + pos = content.index("aws") + assert _find_job_name(content, pos) == "deploy" + + def test_no_job(self): + content = "name: test\non: push\n" + assert _find_job_name(content, 5) == "" + + +# ── Identity conversion tests ───────────────────────────────────────── + +class TestConnectionToIdentities: + def test_aws_oidc_identity(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy.yml", + workflow_name="Deploy", + job_name="deploy", + cloud_provider="aws", + auth_method="oidc", + role_arn="arn:aws:iam::123456789012:role/admin-deploy", + has_oidc_permission=True, + trigger_events=["push", "pull_request"], + ) + identities = _connection_to_identities(conn, "acme/app") + assert len(identities) == 1 + ident = identities[0] + assert ident.identity_type == IdentityType.GITHUB_ACTIONS_OIDC + assert ident.provider == Provider.GITHUB + assert ident.classification == Classification.MACHINE + assert ident.raw["role_arn"] == "arn:aws:iam::123456789012:role/admin-deploy" + + def test_pr_trigger_risk_flag(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy.yml", + workflow_name="Deploy", + cloud_provider="aws", + auth_method="oidc", + role_arn="arn:aws:iam::123456789012:role/deploy", + has_oidc_permission=True, + trigger_events=["push", "pull_request"], + ) + identities = _connection_to_identities(conn, "acme/app") + flags = identities[0].risk_flags + codes = [f.code for f in flags] + assert "GH_OIDC_PR_TRIGGER" in codes + + def test_admin_role_name_risk_flag(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy.yml", + workflow_name="Deploy", + cloud_provider="aws", + auth_method="oidc", + role_arn="arn:aws:iam::123456789012:role/deploy-admin", + has_oidc_permission=True, + trigger_events=["push"], + ) + identities = _connection_to_identities(conn, "acme/app") + codes = [f.code for f in identities[0].risk_flags] + assert "GH_OIDC_ADMIN_ROLE" in codes + + def test_no_oidc_permission_flag(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy.yml", + workflow_name="Deploy", + cloud_provider="aws", + auth_method="oidc", + role_arn="arn:aws:iam::123456789012:role/deploy", + has_oidc_permission=False, + trigger_events=["push"], + ) + identities = _connection_to_identities(conn, "acme/app") + codes = [f.code for f in identities[0].risk_flags] + assert "GH_OIDC_NO_PERMISSION" in codes + + def test_azure_identity(self): + conn = WorkflowOIDCConnection( + workflow_file="infra.yml", + workflow_name="Infra", + cloud_provider="azure", + auth_method="oidc", + azure_client_id="aaa-bbb", + azure_tenant_id="ccc-ddd", + has_oidc_permission=True, + trigger_events=["push"], + ) + identities = _connection_to_identities(conn, "acme/app") + assert len(identities) == 1 + assert identities[0].raw["cloud_provider"] == "azure" + assert identities[0].raw["azure_client_id"] == "aaa-bbb" + + def test_gcp_identity(self): + conn = WorkflowOIDCConnection( + workflow_file="ci.yml", + workflow_name="CI", + cloud_provider="gcp", + auth_method="oidc", + gcp_service_account="sa@proj.iam.gserviceaccount.com", + has_oidc_permission=True, + trigger_events=["push"], + ) + identities = _connection_to_identities(conn, "acme/app") + assert len(identities) == 1 + assert identities[0].raw["gcp_service_account"] == "sa@proj.iam.gserviceaccount.com" + + def test_managed_identity_azure(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy-dev.yml", + workflow_name="Deploy to DEV", + job_name="deploy-dev", + cloud_provider="azure", + auth_method="managed_identity", + self_hosted_runner="azure-green-runner-vmss", + keyvault_secrets=["tenant-id", "subscription-id"], + keyvault_name="seaionl-secrets", + has_aks_access=True, + trigger_events=["push"], + ) + identities = _connection_to_identities(conn, "acme/infra") + assert len(identities) == 1 + ident = identities[0] + assert ident.raw["auth_method"] == "managed_identity" + assert ident.raw["self_hosted_runner"] == "azure-green-runner-vmss" + assert ident.raw["keyvault_name"] == "seaionl-secrets" + codes = [f.code for f in ident.risk_flags] + assert "GH_WF_SELF_HOSTED_MI" in codes + assert "GH_WF_KEYVAULT_SECRETS" in codes + assert "GH_WF_AKS_ACCESS" in codes + + def test_managed_identity_no_pr_trigger(self): + conn = WorkflowOIDCConnection( + workflow_file="deploy-prod.yml", + workflow_name="Deploy to PROD", + cloud_provider="azure", + auth_method="managed_identity", + self_hosted_runner="azure-green-runner-vmss", + trigger_events=["push", "workflow_dispatch"], + ) + identities = _connection_to_identities(conn, "acme/infra") + codes = [f.code for f in identities[0].risk_flags] + assert "GH_OIDC_PR_TRIGGER" not in codes + + +# ── File scanning tests ─────────────────────────────────────────────── + +class TestScanWorkflows: + def test_scan_directory(self, tmp_path): + wf_dir = tmp_path / ".github" / "workflows" + wf_dir.mkdir(parents=True) + (wf_dir / "deploy.yml").write_text(AWS_OIDC_WORKFLOW) + (wf_dir / "lint.yml").write_text(NO_OIDC_WORKFLOW) + + result = scan_workflows(str(wf_dir), repo_name="acme/app") + assert result.workflows_scanned == 2 + assert len(result.oidc_connections) == 1 + assert len(result.identities) == 1 + assert result.identities[0].identity_type == IdentityType.GITHUB_ACTIONS_OIDC + + def test_scan_single_file(self, tmp_path): + wf_file = tmp_path / "deploy.yml" + wf_file.write_text(AWS_OIDC_WORKFLOW) + + result = scan_workflows(str(wf_file), repo_name="acme/app") + assert result.workflows_scanned == 1 + assert len(result.oidc_connections) == 1 + + def test_scan_missing_path(self): + result = scan_workflows("/nonexistent/path") + assert len(result.errors) > 0 + assert result.workflows_scanned == 0 + + def test_scan_multi_cloud(self, tmp_path): + wf_dir = tmp_path / "workflows" + wf_dir.mkdir() + (wf_dir / "multi.yml").write_text(MULTI_CLOUD_WORKFLOW) + + result = scan_workflows(str(wf_dir), repo_name="acme/app") + assert result.workflows_scanned == 1 + assert len(result.oidc_connections) == 2 + assert len(result.identities) == 2 + + +# ── Graph integration tests ─────────────────────────────────────────── + +class TestOIDCGraph: + def _make_oidc_aws_identity(self): + return Identity( + id="github:oidc:aws:deploy.yml:arn:aws:iam::123:role/deploy-admin", + name="OIDC → AWS (Deploy)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "cloud_provider": "aws", + "role_arn": "arn:aws:iam::123:role/deploy-admin", + "role_policies": ["AdministratorAccess"], + }, + risk_flags=[ + RiskFlag(Severity.CRITICAL, "GH_OIDC_ADMIN_ROLE", + "OIDC assumes admin role", "Critical: admin access"), + ], + ) + + def test_oidc_creates_graph_edge(self): + oidc_ident = self._make_oidc_aws_identity() + graph = build_graph([oidc_ident]) + # Should have: OIDC node + synthetic IAM role node + policy node + assert len(graph.nodes) >= 2 + # Should have OIDC → role edge + oidc_edges = [e for e in graph.edges if e.edge_type == EdgeType.OIDC_ASSUMES_ROLE] + assert len(oidc_edges) >= 1 + + def test_oidc_is_entry_point(self): + oidc_ident = self._make_oidc_aws_identity() + graph = build_graph([oidc_ident]) + entry_points = graph.entry_points() + oidc_entries = [n for n in entry_points if n.node_type == "github_actions_oidc"] + assert len(oidc_entries) == 1 + + def test_oidc_admin_creates_privileged_node(self): + oidc_ident = self._make_oidc_aws_identity() + graph = build_graph([oidc_ident]) + privileged = graph.privileged_nodes() + # The AdministratorAccess policy node should be privileged + priv_labels = [n.label for n in privileged] + assert "AdministratorAccess" in priv_labels + + def test_oidc_cross_system_attack_path(self): + """OIDC → AWS role should produce a cross-system attack path.""" + oidc_ident = self._make_oidc_aws_identity() + ap_result = analyze_attack_paths([oidc_ident]) + assert len(ap_result.paths) >= 1 + # Should have at least one cross-system path + cross = [p for p in ap_result.paths if p.cross_system] + assert len(cross) >= 1 + + def test_oidc_azure_graph_edge(self): + ident = Identity( + id="github:oidc:azure:infra.yml:aaa-bbb", + name="OIDC → Azure (Infra)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "cloud_provider": "azure", + "azure_client_id": "aaa-bbb-ccc-ddd", + }, + ) + graph = build_graph([ident]) + oidc_edges = [e for e in graph.edges if e.edge_type == EdgeType.OIDC_ASSUMES_ROLE] + assert len(oidc_edges) == 1 + + def test_oidc_gcp_graph_edge(self): + ident = Identity( + id="github:oidc:gcp:ci.yml:sa@proj.iam.gserviceaccount.com", + name="OIDC → GCP (CI)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "cloud_provider": "gcp", + "gcp_service_account": "sa@proj.iam.gserviceaccount.com", + }, + ) + graph = build_graph([ident]) + oidc_edges = [e for e in graph.edges if e.edge_type == EdgeType.OIDC_ASSUMES_ROLE] + assert len(oidc_edges) == 1 + + def test_oidc_correlates_with_existing_aws_role(self): + """When an AWS role is already discovered, OIDC should link to it.""" + now = datetime.now(timezone.utc) + aws_role = Identity( + id="aws:iam:role:123:deploy-admin", + name="deploy-admin", + provider=Provider.AWS, + identity_type=IdentityType.IAM_ROLE, + classification=Classification.MACHINE, + created_at=now, + arn="arn:aws:iam::123:role/deploy-admin", + policies=["AdministratorAccess"], + risk_flags=[ + RiskFlag(Severity.CRITICAL, "AWS_ADMIN_ACCESS", + "Has AdministratorAccess", "Critical"), + ], + ) + oidc_ident = Identity( + id="github:oidc:aws:deploy.yml:arn:aws:iam::123:role/deploy-admin", + name="OIDC → AWS (Deploy)", + provider=Provider.GITHUB, + identity_type=IdentityType.GITHUB_ACTIONS_OIDC, + classification=Classification.MACHINE, + raw={ + "cloud_provider": "aws", + "role_arn": "arn:aws:iam::123:role/deploy-admin", + }, + ) + graph = build_graph([aws_role, oidc_ident]) + # Should link OIDC directly to the existing AWS role (not a synthetic node) + oidc_edges = [e for e in graph.edges if e.edge_type == EdgeType.OIDC_ASSUMES_ROLE] + assert len(oidc_edges) == 1 + assert oidc_edges[0].target_id == aws_role.id + + +# ── CLI integration tests ───────────────────────────────────────────── + +class TestCLIWorkflows: + def test_demo_includes_oidc_data(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--attack-paths"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "OIDC" in result.stdout + + def test_demo_ci_summary_shows_oidc_paths(self): + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "demo", "--ci-summary"], + capture_output=True, text=True, + ) + assert result.returncode == 0 + assert "github-actions" in result.stdout + assert "OIDC" in result.stdout + + def test_scan_github_workflows_flag(self, tmp_path): + """Test --github-workflows with a real workflow file.""" + wf_dir = tmp_path / ".github" / "workflows" + wf_dir.mkdir(parents=True) + (wf_dir / "deploy.yml").write_text(AWS_OIDC_WORKFLOW) + + # Use --github-workflows pointing to the temp dir + # No provider flags needed — workflow-only scan + result = subprocess.run( + [sys.executable, "-m", "nhinsight.cli", "scan", + "--github-workflows", str(wf_dir), + "--aws", # need at least one provider + "--ci-summary", "--ascii"], + capture_output=True, text=True, + ) + # May fail due to no AWS creds, but should parse workflows + # The workflow scanner runs before provider scanning + assert "github-actions" in result.stdout or result.returncode == 0 or "OIDC" in result.stdout