diff --git a/README.md b/README.md index 76add19..0702d3c 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,49 @@ Tested on Mac OSX with yEd for visualization. Parsing colors and labels in yEd (GraphML): -Edit->Properties Mapper -Add a configuration for node - -Add a mapping for label/Label Text + -Add a mapping for Data: 'label' to 'Label Text' + -Add a mapping for Data: 'highlight' to 'Fill Color' -Add a configuration for edge - -Add a mapping for color/Line Color + -Add a mapping for Data: 'color' to 'Line Color' + -Add a mapping for Data: 'width' to 'Line Width' + -Add a mapping for Data: 'trust_type' or 'when_created' to 'Label Text' + -Add a mapping for Data: 'trust_attributes' to 'Tooltip' + -Click 'Apply' to update the visualization. + +Usage Examples +============== +Filter for trusts created after a specific date: +`python TrustVisualizer.py trusts.csv --after 2023-01-01` + +Filter for specific domains and export to JSON: +`python TrustVisualizer.py trusts.csv --domains dev.corp.local prod.corp.local --format json` + +Filter for only Inbound and Bidirectional trusts: +`python TrustVisualizer.py trusts.csv --directions Inbound Bidirectional` + +Exclude specific domains from the visualization: +`python TrustVisualizer.py trusts.csv --exclude-domains test.local lab.local` + +Filter for specific trust attributes: +`python TrustVisualizer.py trusts.csv --trust-attributes WITHIN_FOREST FOREST_TRANSITIVE` + +Find and highlight all simple access paths between multiple domains: +`python TrustVisualizer.py trusts.csv --source dev.corp.local research.corp.local --target prod.corp.local root.local` + +Find access paths with a maximum length of 3 hops: +`python TrustVisualizer.py trusts.csv --source dev.corp.local --target root.local --max-hops 3` + +Find all access paths and save to a text file: +`python TrustVisualizer.py trusts.csv --source dev.corp.local --target prod.corp.local --output-path-file shortest_path.txt` + +Find paths bypassing a blacklisted domain: +`python TrustVisualizer.py trusts.csv --source dev.corp.local --target prod.corp.local --blacklist-domains intermediate.local` + +Export filtered results to a new CSV file: +`python TrustVisualizer.py trusts.csv --after 2024-01-01 --output-csv filtered_trusts.csv` + +Generate a summary of trust types processed: +`python TrustVisualizer.py trusts.csv` Setup & Requirements ==================== diff --git a/TrustVisualizer.py b/TrustVisualizer.py index 159ccfa..d47bd59 100755 --- a/TrustVisualizer.py +++ b/TrustVisualizer.py @@ -1,86 +1,275 @@ -#!/usr/bin/python +#!/usr/bin/env python3 ############################################################################## # Author: @harmj0y # # Based on: https://github.com/sixdub/DomainTrustExplorer by @sixdub # -# Description: Usesnetworkx library to transform PowerView's updated +# Description: Uses networkx library to transform PowerView's updated # Get-DomainTrustMapping functionality output to graphml # # License: BSD 3-clause ############################################################################## +import argparse +import csv +import json +import sys +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + import networkx as nx -import sys, csv -if __name__ == '__main__': +def _parse_date(date_str: str) -> Optional[datetime]: + """Attempt to parse common date formats found in AD/PowerView CSVs.""" + for fmt in ("%m/%d/%Y %I:%M:%S %p", "%m/%d/%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d"): + try: + return datetime.strptime(date_str.strip(), fmt) + except ValueError: + continue + return None - if (len(sys.argv) != 2): - print "usage: ./TrustVisualizer.py " - exit() +def process_trusts( + input_file: str, + filter_domains: Optional[List[str]] = None, + filter_types: Optional[List[str]] = None, + filter_directions: Optional[List[str]] = None, + filter_attributes: Optional[List[str]] = None, + output_format: str = "graphml", + after_date: Optional[str] = None, + exclude_domains: Optional[List[str]] = None, + source_domains: Optional[List[str]] = None, + target_domains: Optional[List[str]] = None, + output_path_file: Optional[str] = None, + output_csv_file: Optional[str] = None, + path_color: str = "#FFD700", + blacklist_domains: Optional[List[str]] = None, + max_hops: Optional[int] = None +) -> None: + """Parses the trust CSV and generates a GraphML file.""" + trust_type_counts: Dict[str, int] = {} + filtered_rows: List[Dict] = [] graph = nx.DiGraph() - intputFile = sys.argv[1] - - with open(intputFile, 'rb') as csvfile: - - reader = csv.reader(csvfile, delimiter=',') - - for row in reader: - - # if we have the header row, skip - if row[0] == 'SourceName': - continue - - # csv format: - # SourceName,TargetName,TrustType,TrustAttributes,TrustDirection,WhenCreated,WhenChanged - ecolor = '' - sourceName = row[0].strip() - targetName = row[1].strip() - trustType = row[2].strip() - trustAttributes = row[3].strip() - trustDirection = row[4].strip() - - # if the source and destination domains are the same, skip - if (sourceName == targetName): - continue - - if (trustType == 'MIT'): - # black label for MIT trusts - ecolor ='#000000' - - else: - if "WITHIN_FOREST" in trustAttributes: - # green label for intra-forest trusts - ecolor = '#009900' - elif (trustAttributes == "FOREST_TRANSITIVE"): - # blue label for inter-forest trusts - ecolor = '#0000CC' - elif ((trustAttributes == "") or (trustAttributes == "TREAT_AS_EXTERNAL") or (trustAttributes == "FILTER_SIDS")): - # red label for external trusts - ecolor = '#FF0000' + input_path = Path(input_file) + + if not input_path.exists(): + print(f"[-] Error: File '{input_file}' not found.") + sys.exit(1) + + # Normalize filters for case-insensitive comparison + domain_set = {d.lower() for d in filter_domains} if filter_domains else None + exclude_set = {d.lower() for d in exclude_domains} if exclude_domains else None + type_set = {t.lower() for t in filter_types} if filter_types else None + attribute_set = {a.lower() for a in filter_attributes} if filter_attributes else None + blacklist_set = {d.lower() for d in blacklist_domains} if blacklist_domains else None + direction_set = {d.lower() for d in filter_directions} if filter_directions else None + + after_dt = _parse_date(after_date) if after_date else None + if after_date and not after_dt: + print(f"[-] Warning: Could not parse filter date '{after_date}'. Supported formats: YYYY-MM-DD or MM/DD/YYYY HH:MM:SS AM/PM.") + + try: + with input_path.open('r', encoding='utf-8', errors='ignore') as csvfile: + # PowerView CSVs often have a BOM or specific encoding; + # DictReader allows us to access columns by name. + reader = csv.DictReader(csvfile) + + for row in reader: + # Mapping CSV columns to variables + source_name = row.get('SourceName', '').strip() + target_name = row.get('TargetName', '').strip() + trust_type = row.get('TrustType', '').strip() + trust_attributes = row.get('TrustAttributes', '').strip() + trust_direction = row.get('TrustDirection', '').strip() + when_created = row.get('WhenCreated', '').strip() + when_changed = row.get('WhenChanged', '').strip() + + # Apply Filters + if domain_set: + if source_name.lower() not in domain_set and target_name.lower() not in domain_set: + continue + + if exclude_set: + if source_name.lower() in exclude_set or target_name.lower() in exclude_set: + continue + + if type_set: + if trust_type.lower() not in type_set: + continue + + if attribute_set: + if not any(attr in trust_attributes.lower() for attr in attribute_set): + continue + + if direction_set: + if trust_direction.lower() not in direction_set: + continue + + if after_dt: + row_dt = _parse_date(when_created) + if row_dt and row_dt < after_dt: + continue + + # Validation: Skip empty rows or self-referencing trusts + if not source_name or not target_name or source_name == target_name: + continue + + # Save the row for CSV export + filtered_rows.append(row) + + # Count trust types for summary + trust_type_counts[trust_type] = trust_type_counts.get(trust_type, 0) + 1 + + # Determine edge color based on trust attributes + if trust_type == 'MIT': + ecolor = '#000000' # Black for MIT + elif "WITHIN_FOREST" in trust_attributes: + ecolor = '#009900' # Green for intra-forest + elif trust_attributes == "FOREST_TRANSITIVE": + ecolor = '#0000CC' # Blue for inter-forest + elif any(attr in trust_attributes for attr in ["", "TREAT_AS_EXTERNAL", "FILTER_SIDS"]): + ecolor = '#FF0000' # Red for external else: - # violet label for unknown - print "[-] Unrecognized trust attributes between %s and %s : %s" % (sourceName, targetName, trustAttributes) - ecolor = '#EE82EE' - - # add the domain nodes to the internal graph - graph.add_node(sourceName, label=sourceName) - graph.add_node(targetName, label=targetName) - - # add the edges to the graph - if "Bidirectional" in trustDirection: - graph.add_edge(sourceName, targetName, color=ecolor) - graph.add_edge(targetName, sourceName, color=ecolor) - elif "Outbound" in trustDirection: - graph.add_edge(targetName, sourceName, color=ecolor) - elif "Inbound" in trustDirection: - graph.add_edge(sourceName, targetName, color=ecolor) - else: - print "[-] Unrecognized relationship direction between %s and %s : %s" % (sourceName, targetName, trustDirection) - - outputFile = intputFile + ".graphml" - nx.write_graphml(graph, outputFile) - print "\n[+] Graphml writte to '%s'" % (outputFile) - print "\n[*] Note: green = within forest, red = external, blue = forest to forest, black = MIT, violet = unrecognized\n" + print(f"[-] Unrecognized trust attributes between {source_name} and {target_name}: {trust_attributes}") + ecolor = '#EE82EE' # Violet for unknown + + # Add nodes + graph.add_node(source_name, label=source_name) + graph.add_node(target_name, label=target_name) + + # Metadata to be included in the edge (relationship) + edge_props = { + "color": ecolor, + "trust_type": trust_type, + "trust_attributes": trust_attributes, + "when_created": when_created, + "when_changed": when_changed + } + + # Add edges based on access direction (Inverse of trust direction) + # Bidirectional = Access both ways + # Outbound Trust = Target trusts Source = Source can access Target (Edge: Source -> Target) + # Inbound Trust = Source trusts Target = Target can access Source (Edge: Target -> Source) + + if "Bidirectional" in trust_direction: + graph.add_edge(source_name, target_name, **edge_props) + graph.add_edge(target_name, source_name, **edge_props) + elif "Outbound" in trust_direction: + # Access is Source -> Target + graph.add_edge(source_name, target_name, **edge_props) + elif "Inbound" in trust_direction: + # Access is Target -> Source + graph.add_edge(target_name, source_name, **edge_props) + else: + print(f"[-] Unrecognized relationship direction between {source_name} and {target_name}: {trust_direction}") + + # Prepare graph for pathfinding, potentially removing blacklisted domains + pathfinding_graph = graph.copy() + if blacklist_set: + for domain in blacklist_set: + if domain in pathfinding_graph: + pathfinding_graph.remove_node(domain) + print(f"[*] Blacklisted domain '{domain}' removed from pathfinding graph.") + else: + print(f"[-] Blacklisted domain '{domain}' not found in graph.") + + # Find and highlight all simple paths if source(s) and target(s) are provided + if source_domains and target_domains: + all_paths = [] + for s in source_domains: + for t in target_domains: + try: + # Find all simple paths (respecting directed edges/access flow) + paths = list(nx.all_simple_paths(pathfinding_graph, source=s, target=t, cutoff=max_hops)) + if paths: + all_paths.extend(paths) + print(f"[+] Found {len(paths)} path(s) between {s} and {t}") + + # Highlight nodes and edges in all found paths + for path in paths: + for i in range(len(path) - 1): + u, v = path[i], path[i+1] + graph.nodes[u]['highlight'] = path_color + graph.nodes[v]['highlight'] = path_color + if graph.has_edge(u, v): # Ensure edge still exists in original graph + graph.edges[u, v]['color'] = path_color + graph.edges[u, v]['width'] = '3.0' + except (nx.NetworkXNoPath, nx.NodeNotFound): + # NodeNotFound or NoPath could be due to blacklisting or actual absence + print(f"[-] No path found from '{s}' to '{t}' (or one of the domains is missing/blacklisted).") + # No path found is handled silently for bulk pairs unless one is missing + + if output_path_file and all_paths: + try: + with open(output_path_file, 'w', encoding='utf-8') as f: + for path in all_paths: + f.write(f"Access path from {path[0]} to {path[-1]}:\n") + f.write(f"{' -> '.join(path)}\n\n") + print(f"[+] {len(all_paths)} total path(s) saved to '{output_path_file}'") + except IOError as io_err: + print(f"[-] Error writing shortest path to file '{output_path_file}': {io_err}") + + if output_csv_file and filtered_rows: + try: + with open(output_csv_file, 'w', encoding='utf-8', newline='') as f: + # Use the keys from the first row as headers + writer = csv.DictWriter(f, fieldnames=filtered_rows[0].keys()) + writer.writeheader() + writer.writerows(filtered_rows) + print(f"[+] Filtered CSV written to '{output_csv_file}'") + except Exception as csv_err: + print(f"[-] Error writing filtered CSV: {csv_err}") + + if output_format == "json": + # Generates a JSON format compatible with D3.js (node-link data) + output_file = f"{input_file}.json" + data = nx.node_link_data(graph) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=4) + print(f"\n[+] JSON (D3 format) written to '{output_file}'") + else: + output_file = f"{input_file}.graphml" + nx.write_graphml(graph, output_file) + print(f"\n[+] GraphML written to '{output_file}'") + + print("\n[*] Color Legend:") + print(" Green = Within Forest") + print(" Red = External / SID Filtering") + print(" Blue = Forest Transitive") + print(" Black = MIT") + print(" Violet = Unrecognized\n") + + print("\n[*] Trust Type Summary:") + if trust_type_counts: + for trust_type, count in sorted(trust_type_counts.items()): + print(f" - {trust_type}: {count}") + else: + print(" No trusts processed matching the criteria.") + + except Exception as e: + print(f"[-] An error occurred during processing: {e}") + sys.exit(1) + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="Transform PowerView's Get-DomainTrustMapping CSV into GraphML.") + parser.add_argument("trust_file", help="Path to the trust.csv file generated by PowerView.") + parser.add_argument("--domains", nargs='+', help="Filter by specific domain names (case-insensitive).") + parser.add_argument("--exclude-domains", nargs='+', help="Exclude specific domain names from the graph (case-insensitive).") + parser.add_argument("--trust-types", nargs='+', help="Filter by specific trust types (e.g., MIT, ParentChild).") + parser.add_argument("--trust-attributes", nargs='+', help="Filter by specific trust attributes (e.g., WITHIN_FOREST, FOREST_TRANSITIVE).") + parser.add_argument("--directions", nargs='+', help="Filter by trust direction (Inbound, Outbound, Bidirectional).") + parser.add_argument("--format", choices=["graphml", "json"], default="graphml", help="Output format (default: graphml). 'json' is D3.js compatible.") + parser.add_argument("--after", help="Only include trusts created after this date (e.g., '2023-01-01' or '12/31/2022').") + parser.add_argument("--source", nargs='+', help="Source domain(s) for shortest path analysis.") + parser.add_argument("--target", nargs='+', help="Target domain(s) for shortest path analysis.") + parser.add_argument("--output-path-file", help="Path to a file to save the shortest path analysis results.") + parser.add_argument("--output-csv", help="Path to export the filtered trust data back into a CSV file.") + parser.add_argument("--path-color", default="#FFD700", help="Custom hex color for highlighted paths (default: #FFD700).") + parser.add_argument("--blacklist-domains", nargs='+', help="Exclude specific domains from pathfinding (bypasses them).") + parser.add_argument("--max-hops", type=int, help="Maximum path length (hops) to search for pathfinding.") + + args = parser.parse_args() + + process_trusts(args.trust_file, filter_domains=args.domains, filter_types=args.trust_types, filter_directions=args.directions, filter_attributes=args.trust_attributes, output_format=args.format, after_date=args.after, exclude_domains=args.exclude_domains, source_domains=args.source, target_domains=args.target, output_path_file=args.output_path_file, output_csv_file=args.output_csv, path_color=args.path_color, blacklist_domains=args.blacklist_domains, max_hops=args.max_hops)