Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,49 @@ Tested on Mac OSX with yEd for visualization.
Parsing colors and labels in yEd (GraphML):
-Edit->Properties Mapper
-Add a configuration for node
-Add a mapping for label/Label Text
-Add a mapping for Data: 'label' to 'Label Text'
-Add a mapping for Data: 'highlight' to 'Fill Color'
-Add a configuration for edge
-Add a mapping for color/Line Color
-Add a mapping for Data: 'color' to 'Line Color'
-Add a mapping for Data: 'width' to 'Line Width'
-Add a mapping for Data: 'trust_type' or 'when_created' to 'Label Text'
-Add a mapping for Data: 'trust_attributes' to 'Tooltip'
-Click 'Apply' to update the visualization.

Usage Examples
==============
Filter for trusts created after a specific date:
`python TrustVisualizer.py trusts.csv --after 2023-01-01`

Filter for specific domains and export to JSON:
`python TrustVisualizer.py trusts.csv --domains dev.corp.local prod.corp.local --format json`

Filter for only Inbound and Bidirectional trusts:
`python TrustVisualizer.py trusts.csv --directions Inbound Bidirectional`

Exclude specific domains from the visualization:
`python TrustVisualizer.py trusts.csv --exclude-domains test.local lab.local`

Filter for specific trust attributes:
`python TrustVisualizer.py trusts.csv --trust-attributes WITHIN_FOREST FOREST_TRANSITIVE`

Find and highlight all simple access paths between multiple domains:
`python TrustVisualizer.py trusts.csv --source dev.corp.local research.corp.local --target prod.corp.local root.local`

Find access paths with a maximum length of 3 hops:
`python TrustVisualizer.py trusts.csv --source dev.corp.local --target root.local --max-hops 3`

Find all access paths and save to a text file:
`python TrustVisualizer.py trusts.csv --source dev.corp.local --target prod.corp.local --output-path-file shortest_path.txt`

Find paths bypassing a blacklisted domain:
`python TrustVisualizer.py trusts.csv --source dev.corp.local --target prod.corp.local --blacklist-domains intermediate.local`

Export filtered results to a new CSV file:
`python TrustVisualizer.py trusts.csv --after 2024-01-01 --output-csv filtered_trusts.csv`

Generate a summary of trust types processed:
`python TrustVisualizer.py trusts.csv`

Setup & Requirements
====================
Expand Down
327 changes: 258 additions & 69 deletions TrustVisualizer.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,275 @@
#!/usr/bin/python
#!/usr/bin/env python3

##############################################################################
# Author: @harmj0y
#
# Based on: https://github.com/sixdub/DomainTrustExplorer by @sixdub
#
# Description: Usesnetworkx library to transform PowerView's updated
# Description: Uses networkx library to transform PowerView's updated
# Get-DomainTrustMapping functionality output to graphml
#
# License: BSD 3-clause
##############################################################################

import argparse
import csv
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional

import networkx as nx
import sys, csv

if __name__ == '__main__':
def _parse_date(date_str: str) -> Optional[datetime]:
"""Attempt to parse common date formats found in AD/PowerView CSVs."""
for fmt in ("%m/%d/%Y %I:%M:%S %p", "%m/%d/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(date_str.strip(), fmt)
except ValueError:
continue
return None

if (len(sys.argv) != 2):
print "usage: ./TrustVisualizer.py <trust_file.csv>"
exit()
def process_trusts(
input_file: str,
filter_domains: Optional[List[str]] = None,
filter_types: Optional[List[str]] = None,
filter_directions: Optional[List[str]] = None,
filter_attributes: Optional[List[str]] = None,
output_format: str = "graphml",
after_date: Optional[str] = None,
exclude_domains: Optional[List[str]] = None,
source_domains: Optional[List[str]] = None,
target_domains: Optional[List[str]] = None,
output_path_file: Optional[str] = None,
output_csv_file: Optional[str] = None,
path_color: str = "#FFD700",
blacklist_domains: Optional[List[str]] = None,
max_hops: Optional[int] = None
) -> None:
"""Parses the trust CSV and generates a GraphML file."""
trust_type_counts: Dict[str, int] = {}
filtered_rows: List[Dict] = []

graph = nx.DiGraph()
intputFile = sys.argv[1]

with open(intputFile, 'rb') as csvfile:

reader = csv.reader(csvfile, delimiter=',')

for row in reader:

# if we have the header row, skip
if row[0] == 'SourceName':
continue

# csv format:
# SourceName,TargetName,TrustType,TrustAttributes,TrustDirection,WhenCreated,WhenChanged
ecolor = ''
sourceName = row[0].strip()
targetName = row[1].strip()
trustType = row[2].strip()
trustAttributes = row[3].strip()
trustDirection = row[4].strip()

# if the source and destination domains are the same, skip
if (sourceName == targetName):
continue

if (trustType == 'MIT'):
# black label for MIT trusts
ecolor ='#000000'

else:
if "WITHIN_FOREST" in trustAttributes:
# green label for intra-forest trusts
ecolor = '#009900'
elif (trustAttributes == "FOREST_TRANSITIVE"):
# blue label for inter-forest trusts
ecolor = '#0000CC'
elif ((trustAttributes == "") or (trustAttributes == "TREAT_AS_EXTERNAL") or (trustAttributes == "FILTER_SIDS")):
# red label for external trusts
ecolor = '#FF0000'
input_path = Path(input_file)

if not input_path.exists():
print(f"[-] Error: File '{input_file}' not found.")
sys.exit(1)

# Normalize filters for case-insensitive comparison
domain_set = {d.lower() for d in filter_domains} if filter_domains else None
exclude_set = {d.lower() for d in exclude_domains} if exclude_domains else None
type_set = {t.lower() for t in filter_types} if filter_types else None
attribute_set = {a.lower() for a in filter_attributes} if filter_attributes else None
blacklist_set = {d.lower() for d in blacklist_domains} if blacklist_domains else None
direction_set = {d.lower() for d in filter_directions} if filter_directions else None

after_dt = _parse_date(after_date) if after_date else None
if after_date and not after_dt:
print(f"[-] Warning: Could not parse filter date '{after_date}'. Supported formats: YYYY-MM-DD or MM/DD/YYYY HH:MM:SS AM/PM.")

try:
with input_path.open('r', encoding='utf-8', errors='ignore') as csvfile:
# PowerView CSVs often have a BOM or specific encoding;
# DictReader allows us to access columns by name.
reader = csv.DictReader(csvfile)

for row in reader:
# Mapping CSV columns to variables
source_name = row.get('SourceName', '').strip()
target_name = row.get('TargetName', '').strip()
trust_type = row.get('TrustType', '').strip()
trust_attributes = row.get('TrustAttributes', '').strip()
trust_direction = row.get('TrustDirection', '').strip()
when_created = row.get('WhenCreated', '').strip()
when_changed = row.get('WhenChanged', '').strip()

# Apply Filters
if domain_set:
if source_name.lower() not in domain_set and target_name.lower() not in domain_set:
continue

if exclude_set:
if source_name.lower() in exclude_set or target_name.lower() in exclude_set:
continue

if type_set:
if trust_type.lower() not in type_set:
continue

if attribute_set:
if not any(attr in trust_attributes.lower() for attr in attribute_set):
continue

if direction_set:
if trust_direction.lower() not in direction_set:
continue

if after_dt:
row_dt = _parse_date(when_created)
if row_dt and row_dt < after_dt:
continue

# Validation: Skip empty rows or self-referencing trusts
if not source_name or not target_name or source_name == target_name:
continue

# Save the row for CSV export
filtered_rows.append(row)

# Count trust types for summary
trust_type_counts[trust_type] = trust_type_counts.get(trust_type, 0) + 1

# Determine edge color based on trust attributes
if trust_type == 'MIT':
ecolor = '#000000' # Black for MIT
elif "WITHIN_FOREST" in trust_attributes:
ecolor = '#009900' # Green for intra-forest
elif trust_attributes == "FOREST_TRANSITIVE":
ecolor = '#0000CC' # Blue for inter-forest
elif any(attr in trust_attributes for attr in ["", "TREAT_AS_EXTERNAL", "FILTER_SIDS"]):
ecolor = '#FF0000' # Red for external
else:
# violet label for unknown
print "[-] Unrecognized trust attributes between %s and %s : %s" % (sourceName, targetName, trustAttributes)
ecolor = '#EE82EE'

# add the domain nodes to the internal graph
graph.add_node(sourceName, label=sourceName)
graph.add_node(targetName, label=targetName)

# add the edges to the graph
if "Bidirectional" in trustDirection:
graph.add_edge(sourceName, targetName, color=ecolor)
graph.add_edge(targetName, sourceName, color=ecolor)
elif "Outbound" in trustDirection:
graph.add_edge(targetName, sourceName, color=ecolor)
elif "Inbound" in trustDirection:
graph.add_edge(sourceName, targetName, color=ecolor)
else:
print "[-] Unrecognized relationship direction between %s and %s : %s" % (sourceName, targetName, trustDirection)

outputFile = intputFile + ".graphml"
nx.write_graphml(graph, outputFile)
print "\n[+] Graphml writte to '%s'" % (outputFile)
print "\n[*] Note: green = within forest, red = external, blue = forest to forest, black = MIT, violet = unrecognized\n"
print(f"[-] Unrecognized trust attributes between {source_name} and {target_name}: {trust_attributes}")
ecolor = '#EE82EE' # Violet for unknown

# Add nodes
graph.add_node(source_name, label=source_name)
graph.add_node(target_name, label=target_name)

# Metadata to be included in the edge (relationship)
edge_props = {
"color": ecolor,
"trust_type": trust_type,
"trust_attributes": trust_attributes,
"when_created": when_created,
"when_changed": when_changed
}

# Add edges based on access direction (Inverse of trust direction)
# Bidirectional = Access both ways
# Outbound Trust = Target trusts Source = Source can access Target (Edge: Source -> Target)
# Inbound Trust = Source trusts Target = Target can access Source (Edge: Target -> Source)

if "Bidirectional" in trust_direction:
graph.add_edge(source_name, target_name, **edge_props)
graph.add_edge(target_name, source_name, **edge_props)
elif "Outbound" in trust_direction:
# Access is Source -> Target
graph.add_edge(source_name, target_name, **edge_props)
elif "Inbound" in trust_direction:
# Access is Target -> Source
graph.add_edge(target_name, source_name, **edge_props)
else:
print(f"[-] Unrecognized relationship direction between {source_name} and {target_name}: {trust_direction}")

# Prepare graph for pathfinding, potentially removing blacklisted domains
pathfinding_graph = graph.copy()
if blacklist_set:
for domain in blacklist_set:
if domain in pathfinding_graph:
pathfinding_graph.remove_node(domain)
print(f"[*] Blacklisted domain '{domain}' removed from pathfinding graph.")
else:
print(f"[-] Blacklisted domain '{domain}' not found in graph.")

# Find and highlight all simple paths if source(s) and target(s) are provided
if source_domains and target_domains:
all_paths = []
for s in source_domains:
for t in target_domains:
try:
# Find all simple paths (respecting directed edges/access flow)
paths = list(nx.all_simple_paths(pathfinding_graph, source=s, target=t, cutoff=max_hops))
if paths:
all_paths.extend(paths)
print(f"[+] Found {len(paths)} path(s) between {s} and {t}")

# Highlight nodes and edges in all found paths
for path in paths:
for i in range(len(path) - 1):
u, v = path[i], path[i+1]
graph.nodes[u]['highlight'] = path_color
graph.nodes[v]['highlight'] = path_color
if graph.has_edge(u, v): # Ensure edge still exists in original graph
graph.edges[u, v]['color'] = path_color
graph.edges[u, v]['width'] = '3.0'
except (nx.NetworkXNoPath, nx.NodeNotFound):
# NodeNotFound or NoPath could be due to blacklisting or actual absence
print(f"[-] No path found from '{s}' to '{t}' (or one of the domains is missing/blacklisted).")
# No path found is handled silently for bulk pairs unless one is missing

if output_path_file and all_paths:
try:
with open(output_path_file, 'w', encoding='utf-8') as f:
for path in all_paths:
f.write(f"Access path from {path[0]} to {path[-1]}:\n")
f.write(f"{' -> '.join(path)}\n\n")
print(f"[+] {len(all_paths)} total path(s) saved to '{output_path_file}'")
except IOError as io_err:
print(f"[-] Error writing shortest path to file '{output_path_file}': {io_err}")

if output_csv_file and filtered_rows:
try:
with open(output_csv_file, 'w', encoding='utf-8', newline='') as f:
# Use the keys from the first row as headers
writer = csv.DictWriter(f, fieldnames=filtered_rows[0].keys())
writer.writeheader()
writer.writerows(filtered_rows)
print(f"[+] Filtered CSV written to '{output_csv_file}'")
except Exception as csv_err:
print(f"[-] Error writing filtered CSV: {csv_err}")

if output_format == "json":
# Generates a JSON format compatible with D3.js (node-link data)
output_file = f"{input_file}.json"
data = nx.node_link_data(graph)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4)
print(f"\n[+] JSON (D3 format) written to '{output_file}'")
else:
output_file = f"{input_file}.graphml"
nx.write_graphml(graph, output_file)
print(f"\n[+] GraphML written to '{output_file}'")

print("\n[*] Color Legend:")
print(" Green = Within Forest")
print(" Red = External / SID Filtering")
print(" Blue = Forest Transitive")
print(" Black = MIT")
print(" Violet = Unrecognized\n")

print("\n[*] Trust Type Summary:")
if trust_type_counts:
for trust_type, count in sorted(trust_type_counts.items()):
print(f" - {trust_type}: {count}")
else:
print(" No trusts processed matching the criteria.")

except Exception as e:
print(f"[-] An error occurred during processing: {e}")
sys.exit(1)

if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Transform PowerView's Get-DomainTrustMapping CSV into GraphML.")
parser.add_argument("trust_file", help="Path to the trust.csv file generated by PowerView.")
parser.add_argument("--domains", nargs='+', help="Filter by specific domain names (case-insensitive).")
parser.add_argument("--exclude-domains", nargs='+', help="Exclude specific domain names from the graph (case-insensitive).")
parser.add_argument("--trust-types", nargs='+', help="Filter by specific trust types (e.g., MIT, ParentChild).")
parser.add_argument("--trust-attributes", nargs='+', help="Filter by specific trust attributes (e.g., WITHIN_FOREST, FOREST_TRANSITIVE).")
parser.add_argument("--directions", nargs='+', help="Filter by trust direction (Inbound, Outbound, Bidirectional).")
parser.add_argument("--format", choices=["graphml", "json"], default="graphml", help="Output format (default: graphml). 'json' is D3.js compatible.")
parser.add_argument("--after", help="Only include trusts created after this date (e.g., '2023-01-01' or '12/31/2022').")
parser.add_argument("--source", nargs='+', help="Source domain(s) for shortest path analysis.")
parser.add_argument("--target", nargs='+', help="Target domain(s) for shortest path analysis.")
parser.add_argument("--output-path-file", help="Path to a file to save the shortest path analysis results.")
parser.add_argument("--output-csv", help="Path to export the filtered trust data back into a CSV file.")
parser.add_argument("--path-color", default="#FFD700", help="Custom hex color for highlighted paths (default: #FFD700).")
parser.add_argument("--blacklist-domains", nargs='+', help="Exclude specific domains from pathfinding (bypasses them).")
parser.add_argument("--max-hops", type=int, help="Maximum path length (hops) to search for pathfinding.")

args = parser.parse_args()

process_trusts(args.trust_file, filter_domains=args.domains, filter_types=args.trust_types, filter_directions=args.directions, filter_attributes=args.trust_attributes, output_format=args.format, after_date=args.after, exclude_domains=args.exclude_domains, source_domains=args.source, target_domains=args.target, output_path_file=args.output_path_file, output_csv_file=args.output_csv, path_color=args.path_color, blacklist_domains=args.blacklist_domains, max_hops=args.max_hops)