Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/repotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
run: |
python -m pip install --upgrade pip
uv sync --all-extras --all-packages --dev
uv pip install -U "huggingface_hub[cli]"
npm install -g @cyclonedx/cdxgen
- name: repotests java-sec-code
run: |
Expand Down Expand Up @@ -107,6 +108,20 @@ jobs:
shell: bash
env:
BLINTDB_HOME: ${{ runner.temp }}/blintdb-home
- name: repotests Signal-Android
run: |
mkdir -p ${GITHUB_WORKSPACE}/depscan_reports/Signal-Android
uv run huggingface-cli download AppThreat/ukaina --include "java/Signal-Android/*.json" --exclude "java/Signal-Android/*.vdr.json" --repo-type dataset --local-dir ${GITHUB_WORKSPACE}/depscan_reports/Signal-Android
uv run depscan --src ${GITHUB_WORKSPACE}/depscan_reports/Signal-Android --bom-dir ${GITHUB_WORKSPACE}/depscan_reports/Signal-Android --reports-dir ${GITHUB_WORKSPACE}/depscan_reports/Signal-Android --reachability-analyzer SemanticReachability --explain
rm -rf ${GITHUB_WORKSPACE}/depscan_reports
shell: bash
- name: repotests cdxgen
run: |
mkdir -p ${GITHUB_WORKSPACE}/depscan_reports/cdxgen
uv run huggingface-cli download AppThreat/ukaina --include "js/cdxgen/*.json" --exclude "js/cdxgen/*.vdr.json" --repo-type dataset --local-dir ${GITHUB_WORKSPACE}/depscan_reports/cdxgen
uv run depscan --src ${GITHUB_WORKSPACE}/depscan_reports/cdxgen --bom-dir ${GITHUB_WORKSPACE}/depscan_reports/cdxgen --reports-dir ${GITHUB_WORKSPACE}/depscan_reports/cdxgen --reachability-analyzer SemanticReachability --explain
rm -rf ${GITHUB_WORKSPACE}/depscan_reports
shell: bash
- name: Set up JDK
uses: actions/setup-java@v4
with:
Expand Down
6 changes: 4 additions & 2 deletions depscan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
sys.stderr.reconfigure(encoding="utf-8")

LOGO = """
_| _ ._ _ _ _. ._
(_| (/_ |_) _> (_ (_| | |
_| _ ._ _ _ _. ._
(_| (/_ |_) _> (_ (_| | |
|
"""

Expand Down Expand Up @@ -670,6 +670,8 @@ def run_depscan(args):
postbuild_bom_file = os.path.join(
reports_dir, f"sbom-postbuild-{project_type}.cdx.json"
)
# We support only one container SBOM per project.
# Projects that rely on docker compose and multiple services require some thinking
container_bom_file = os.path.join(
reports_dir, f"sbom-container-{project_type}.cdx.json"
)
Expand Down
37 changes: 27 additions & 10 deletions depscan/lib/explainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def explain_reachables(reachables, project_type, vdr_result):
""""""
reachable_explanations = 0
checked_flows = 0
has_crypto_flows = False
for areach in reachables.get("reachables", []):
if (
not areach.get("flows")
Expand All @@ -120,11 +121,14 @@ def explain_reachables(reachables, project_type, vdr_result):
# is_prioritized = True
# if not is_prioritized:
# continue
flow_tree, comment, source_sink_desc, has_check_tag = explain_flows(
flow_tree, comment, source_sink_desc, has_check_tag, is_endpoint_reachable, is_crypto_flow = explain_flows(
areach.get("flows"), areach.get("purls"), project_type, vdr_result
)
if not source_sink_desc or not flow_tree:
continue
# Did we find any crypto flows
if is_crypto_flow and not has_crypto_flows:
has_crypto_flows = True
rtable = Table(
box=box.DOUBLE_EDGE,
show_lines=True,
Expand All @@ -146,7 +150,11 @@ def explain_reachables(reachables, project_type, vdr_result):
if reachable_explanations:
tips = """## Secure Design Tips"""

if checked_flows:
if has_crypto_flows:
tips += """
- Generate a Cryptography Bill of Materials (CBOM) using tools such as cdxgen, and track it with platforms like Dependency-Track.
"""
elif checked_flows:
tips += """
- Review the validation and sanitization methods used in the application.
- To enhance the security posture, implement a common validation middleware.
Expand All @@ -168,6 +176,7 @@ def flow_to_source_sink(idx, flow, purls, project_type, vdr_result):
reached_services = vdr_result.reached_services
is_endpoint_reachable = False
possible_reachable_service = False
is_crypto_flow = "crypto" in flow.get("tags", []) or "crypto-generate" in flow.get("tags", [])
method_in_emoji = ":right_arrow_curving_left:"
for p in purls:
if endpoint_reached_purls and endpoint_reached_purls.get(p):
Expand Down Expand Up @@ -224,16 +233,24 @@ def flow_to_source_sink(idx, flow, purls, project_type, vdr_result):
elif len(purls) == 1:
if is_endpoint_reachable:
source_sink_desc = f"{source_sink_desc} can be used to reach this package from certain endpoints."
else:
source_sink_desc = f"{source_sink_desc} can be used to reach this package."
elif source_sink_desc:
if is_crypto_flow:
source_sink_desc = "Reachable crypto-flow."
else:
source_sink_desc = "Reachable data-flow."
else:
if is_endpoint_reachable:
source_sink_desc = f"{source_sink_desc} can be used to reach {len(purls)} packages from certain endpoints."
else:
source_sink_desc = (
f"{source_sink_desc} can be used to reach {len(purls)} packages."
)
return source_sink_desc
if source_sink_desc:
source_sink_desc = (
f"{source_sink_desc} can be used to reach {len(purls)} packages."
)
elif is_crypto_flow:
source_sink_desc = f"{len(purls)} packages reachable from this crypto-flow."
else:
source_sink_desc = f"{len(purls)} packages reachable from this data-flow."
return source_sink_desc, is_endpoint_reachable, is_crypto_flow


def filter_tags(tags):
Expand Down Expand Up @@ -315,7 +332,7 @@ def explain_flows(flows, purls, project_type, vdr_result):
):
continue
if not source_sink_desc:
source_sink_desc = flow_to_source_sink(
source_sink_desc, is_endpoint_reachable, is_crypto_flow = flow_to_source_sink(
idx, aflow, purls, project_type, vdr_result
)
file_loc, flow_str, has_check_tag_flow = flow_to_str(aflow, project_type)
Expand All @@ -336,4 +353,4 @@ def explain_flows(flows, purls, project_type, vdr_result):
0,
":white_medium_small_square: Verify that the mitigation(s) used in this flow are valid and appropriate for your security requirements.",
)
return tree, "\n".join(comments), source_sink_desc, has_check_tag
return tree, "\n".join(comments), source_sink_desc, has_check_tag, is_endpoint_reachable, is_crypto_flow
15 changes: 12 additions & 3 deletions packages/analysis-lib/src/analysis_lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@
from importlib.metadata import distribution
from logging import Logger
from typing import Dict, List, Optional
import fnmatch

from rich.console import Console


def get_all_bom_files(from_dir):
"""
Method to collect all BOM files from a given directory.
Collect all BOM JSON files under `from_dir`,
excluding any files matching '*.vdr.json'.
"""
base = Path(from_dir)
patterns = ["*bom*.json", "*.cdx.json"]
files = {str(p.resolve()) for pattern in patterns for p in base.rglob(pattern)}
include_patterns = ["*bom*.json", "*.cdx.json"]
exclude_pattern = "*.vdr.json"

files = set()
for pattern in include_patterns:
for p in base.rglob(pattern):
if not fnmatch.fnmatch(p.name, exclude_pattern):
files.add(str(p.resolve()))

return sorted(files)


Expand Down
40 changes: 33 additions & 7 deletions packages/analysis-lib/src/analysis_lib/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ def generate_console_output(
options: VdrAnalysisKV,
):
table_rows = []
purl_fixed_location = {}
table = Table(
title=f"Dependency Scan Results ({options.project_type.upper()})",
box=box.DOUBLE_EDGE,
header_style="bold magenta",
show_lines=True,
show_lines=False,
min_width=150,
caption=f"Vulnerabilities count: {len(pkg_vulnerabilities)}",
)
for h in [
"Dependency Tree" if len(bom_dependency_tree) > 0 else "CVE",
Expand Down Expand Up @@ -158,32 +160,56 @@ def generate_console_output(
)
if rating := vdr.get("ratings", {}):
rating = rating[0]
if not purl_fixed_location.get(vdr["purl_prefix"]) and vdr["fixed_location"]:
purl_fixed_location[vdr["purl_prefix"]] = vdr["fixed_location"]
table_rows.append(
[
vdr["id"],
vdr["purl_prefix"],
vdr["p_rich_tree"],
"\n".join(vdr["insights"]),
vdr["fixed_location"],
vdr["insights"],
vdr["fixed_location"] or purl_fixed_location.get(vdr["purl_prefix"]),
f"""{"[bright_red]" if rating.get("severity", "").upper() == "CRITICAL" else ""}{rating.get("severity", "").upper()}""",
f"""{"[bright_red]" if rating.get("severity", "").upper() == "CRITICAL" else ""}{rating.get("score", "")}""",
]
)
# Attempt to group the packages before output
grouped_purls = defaultdict(list)
cve_rows = {}
# We can dim certain unimportant rows
dimmable_severities = ("LOW",) if not pkg_group_rows else ("LOW", "MEDIUM")
for arow in table_rows:
grouped_purls[arow[1]].append(arow[0])
cve_rows[arow[0]] = [arow[2], arow[3], arow[4], arow[5], arow[6]]
cve_rows[arow[0]] = [
arow[2],
arow[3],
arow[4] or purl_fixed_location.get(arow[1]),
arow[5],
arow[6],
]
# sort based on cve in descending order
for purl in grouped_purls:
grouped_purls[purl].sort(reverse=True)
# sort the purls
sorted_purls = sorted(grouped_purls.keys())
for purl in sorted_purls:
for cve in grouped_purls[purl]:
for i, cve in enumerate(grouped_purls[purl]):
arow = cve_rows[cve]
table.add_row(arow[0], arow[1], arow[2], arow[3], arow[4])
# Reduce insights repetition
insights = arow[1] if len(arow[1]) > 1 or i == 0 else []
table.add_row(
arow[0],
"\n".join(insights),
f"[bold]{arow[2] or ''}[/bold]"
if i == 0
else "", # Reduce fix version repetition
arow[3],
arow[4],
end_section=(i == len(grouped_purls[purl]) - 1),
style=Style(dim=True)
if not arow[1] or arow[3] in dimmable_severities
else None,
)
return pkg_group_rows, table


Expand Down Expand Up @@ -364,7 +390,7 @@ def summarize_priority_actions(
utable.add_row(
k,
"\n".join(sorted(v, reverse=True)),
matched_by_fixes.get(k),
f"[bold]{matched_by_fixes.get(k) or ''}[/bold]",
next_step_analysis_obj["next_step_str"],
)
return utable
Expand Down
30 changes: 18 additions & 12 deletions packages/analysis-lib/src/analysis_lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@ def analyze_cve_vuln(
):
insights = []
plain_insights = []
pkg_requires_attn = False
cve_requires_attn = False
likely_false_positive = False
purl = vuln.get("matched_by") or ""
purl_obj = parse_purl(purl)
Expand Down Expand Up @@ -1566,6 +1566,8 @@ def analyze_cve_vuln(
)
vdict["recommendation"] = f"Update to version {fixed_location}."
vdict["fixed_location"] = fixed_location
# FIXME: This looks similar to another block above with a subtle
# difference in pkg_severity
pkg_tree_list, p_rich_tree = pkg_sub_tree(
purl,
purl.replace(":", "/"),
Expand Down Expand Up @@ -1605,7 +1607,7 @@ def analyze_cve_vuln(
plain_package_usage = "Deployed dependency"
# Does this require attention
if rating.get("severity", "").upper() in JUST_CRITICAL:
pkg_requires_attn = True
cve_requires_attn = True
counts.critical_count += 1
counts.pkg_attention_count += 1
elif direct_purls.get(purl):
Expand All @@ -1631,6 +1633,7 @@ def analyze_cve_vuln(
plain_package_usage = "Indirect dependency"
# There are pocs or bounties against this vulnerability
if pocs or bounties:
# Reachable purls
if reached_purls.get(purl) or endpoint_reached_purls.get(purl):
if endpoint_reached_purls.get(purl):
insights.append(
Expand All @@ -1644,36 +1647,37 @@ def analyze_cve_vuln(
plain_insights.append("Reachable Bounty target")
counts.has_reachable_poc_count += 1
counts.has_reachable_exploit_count += 1
pkg_requires_attn = True
cve_requires_attn = True
# Direct usage
elif direct_purls.get(purl) or is_purl_in_postbuild(purl, postbuild_purls):
insights.append(
"[yellow]:notebook_with_decorative_cover: Bug Bounty target[/yellow]"
)
plain_insights.append("Bug Bounty target")
else:
else: # Just PoC
insights.append("[yellow]:notebook_with_decorative_cover: Has PoC[/yellow]")
plain_insights.append("Has PoC")
counts.has_poc_count += 1
if rating.get("severity", "").upper() in JUST_CRITICAL:
pkg_requires_attn = True
cve_requires_attn = True
if direct_purls.get(purl) or is_purl_in_postbuild(purl, postbuild_purls):
counts.pkg_attention_count += 1
if recommendation:
counts.fix_version_count += 1
counts.critical_count += 1
# Purl is reachable
# App Purl is reachable
if (
vendors
and package_type not in OS_PKG_TYPES
and (reached_purls.get(purl) or endpoint_reached_purls.get(purl))
):
# If it has a poc, an insight might have gotten added above
if not pkg_requires_attn:
if not cve_requires_attn:
if endpoint_reached_purls.get(purl):
insights.append(":spider_web: Endpoint-Reachable")
plain_insights.append("Endpoint-Reachable")
if rating.get("severity", "").upper() in CRITICAL_OR_HIGH:
pkg_requires_attn = True
cve_requires_attn = True
else:
insights.append(":receipt: Reachable")
plain_insights.append("Reachable")
Expand Down Expand Up @@ -1710,7 +1714,7 @@ def analyze_cve_vuln(
if "Endpoint-Reachable" in plain_insights:
plain_insights.remove("Endpoint-Reachable")
counts.has_reachable_exploit_count += 1
pkg_requires_attn = True
cve_requires_attn = True
# Fail safe. Packages with exploits and direct usage without
# a reachable flow are still considered reachable to reduce
# false negatives
Expand Down Expand Up @@ -1740,7 +1744,7 @@ def analyze_cve_vuln(
)
plain_insights.append("Known Exploits")
counts.has_exploit_count += 1
pkg_requires_attn = True
cve_requires_attn = True
if cve_record.root.containers.cna.affected.root and (
cpes := cve_record.root.containers.cna.affected.root[0].cpes
):
Expand All @@ -1755,7 +1759,7 @@ def analyze_cve_vuln(
if package_usage:
insights.append(package_usage)
plain_insights.append(plain_package_usage)
add_to_pkg_group_rows = not likely_false_positive and pkg_requires_attn and purl
add_to_pkg_group_rows = not likely_false_positive and cve_requires_attn and purl
insights = list(set(insights))
plain_insights = list(set(plain_insights))
if exploits or pocs:
Expand All @@ -1769,7 +1773,7 @@ def analyze_cve_vuln(
vdict |= {
"insights": insights,
"properties": get_vuln_properties(
fixed_location, pkg_requires_attn, plain_insights, purl
fixed_location, cve_requires_attn, plain_insights, purl
),
}
return counts, vdict, add_to_pkg_group_rows, likely_false_positive
Expand Down Expand Up @@ -1861,6 +1865,8 @@ def get_vendor_url(comp):
if comp.get("licenses"):
for lic in comp.get("licenses"):
license_obj = lic
if isinstance(lic, str):
continue
if lic.get("license"):
license_obj = lic.get("license")
if license_obj.get("id"):
Expand Down
3 changes: 3 additions & 0 deletions packages/analysis-lib/src/analysis_lib/vdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,12 @@ def process(self) -> VDRResult:
prebuild_purls, build_purls, postbuild_purls, optional_pkgs
)
# Retrieve any dependency tree from the SBOM
# This logic could be improved to retrieve multiple matching dependency trees
bom_dependency_tree = retrieve_bom_dependency_tree(
options.bom_file, options.bom_dir
)
# OCI properties will give us information about the container layer
# Can we do anything clever with this information?
oci_props = retrieve_oci_properties(options.bom_file, options.bom_dir)
oci_product_types = oci_props.get("oci:image:componentTypes", "")
counts = Counts()
Expand Down
Loading
Loading