forked from mhrono/jamf-sofa-processor
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmacos-update-processor.py
More file actions
2529 lines (2100 loc) · 93.3 KB
/
macos-update-processor.py
File metadata and controls
2529 lines (2100 loc) · 93.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
macOS Update Processor
Keep your macOS devices up to date using Declarative Device Management, Jamf Pro, and a SOFA feed.
Author: Matt Hrono @ Chime | MacAdmins: @matt_h | mattonmacs.dev
----
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
----
REQUIREMENTS:
- Jamf Pro v11.9.1 or later
- API Credentials for your jamf tenant with the following permissions:
- Read Computers
- Create/Read/Update/Delete Managed Software Updates
- Read Smart Computer Groups
- Read Static Computer Groups
- Send Computer Remote Command to Download and Install OS X Update
- Python v3.12 or later (3.10+ may work, but only tested on 3.12)
- Additional modules detailed in requirements.txt -- I suggest the "recommended" flavor of MacAdmins Python: https://github.com/macadmins/python
- This recommended Python package also includes any other modules this script requires ***(EXCEPT for jamf-pro-sdk)***
- Be sure to update the shebang to point to your managed installation
ACKNOWLEDGEMENTS:
Big thanks to the creators and maintainers of SOFA (https://sofa.macadmins.io/), without whom this project would not be possible
Huge thanks as well to the creators and maintainers of jamf-pro-sdk (https://github.com/macadmins/jamf-pro-sdk-python), also without whom this project would not be possible
OVERVIEW:
Thanks to GitHub Copilot for this overview and the docstrings in each function
KNOWN ISSUES/DEFICIENCIES:
- This script is currently hardcoded for macOS updates only. It could be modified to also support updating iOS devices. I may do this in the future, but it is not currently planned.
- The sendNotifications function is planned but not currently implemented.
- The --check and --retry arguments have not been tested and may not be functional. Use at your own risk.
This script automates the deployment of macOS updates using Jamf Pro's Declarative Device Management (DDM) capabilities. It fetches the latest macOS version data from a SOFA feed, determines the appropriate installation deadlines based on CVE impact scores, and sends update plans to eligible devices or groups.
Functions:
- check_positive(value): Validates if the provided value is a positive integer.
- check_version_arg(version): Validates the version argument to ensure it matches the expected format.
- check_path(datafile): Checks and resolves the path for the data file.
- endRun(exitCode, logLevel, message): Exits the script with a specified exit code, logging level, and final message.
- loadJson(jsonPath): Loads and returns data from a JSON file.
- dumpJson(jsonData, jsonPath): Dumps data into a JSON file.
- sendNotifications(): Placeholder for sending notifications.
- checkModelSupported(device): Checks if a device is supported for the targeted macOS version.
- getCVEDetails(vulnSource, cveID, requestHeaders): Queries NVD or VulnCheck for details about a specific CVE.
- parseVulns(cveList): Determines whether the update deployment should be accelerated based on CVE impact scores.
- convertJamfTimestamp(timestamp): Converts a millisecond epoch timestamp to a datetime object.
- calculateDeadlineString(deadlineDays): Calculates an installation deadline and returns it in a format acceptable to the Jamf API.
- checkExistingDevicePlans(declarationItem, targetVersion): Checks if there are any active DDM update plans for a given device.
- sendDeclaration(objectType, objectIds, installDeadlineString, osVersion): Sends DDM update plans to a device, a list of devices, or a group.
- getComputerGroupData(groupID, groupName): Returns data about a computer group given its ID or name.
- getVersionData(): Parses the provided SOFA feed for the latest macOS version data.
- determineDeadline(cveList, exploitedCVEs): Determines the installation deadline based on runtime arguments and/or CVE data.
- checkDeploymentAvailable(productVersion): Checks if the target version is available via DDM from Jamf.
- checkDeviceDDMEligible(deviceRecord): Verifies if a device is eligible to receive and process a DDM update.
- getPlanData(planUUID): Retrieves and returns data about an update plan and its status/history.
- deduplicatePlans(planList): Filters a list of update plans to include only the most recently created plan per device.
- retryPlan(plan): Retries a failed update plan.
- run(): Main function to execute the script.
Usage:
- The script can be run with various command-line arguments to specify Jamf Pro credentials, target macOS version, update deadlines, and other options.
- It supports checking and retrying existing update plans, filtering devices by group membership, and performing dry runs without executing any changes.
"""
import argparse
import json
import logging
import os
import re
import time
import sys
import requests
from packaging.version import Version
from pathlib import Path
from datetime import datetime, timedelta, timezone
from tempfile import NamedTemporaryFile
from jamf_pro_sdk import JamfProClient, SessionConfig
from jamf_pro_sdk.models.classic import computer_groups
from jamf_pro_sdk.models.pro import computers
from jamf_pro_sdk.clients.pro_api.pagination import FilterField, filter_group
from jamf_pro_sdk.clients.auth import ApiClientCredentialsProvider
## Version
scriptVersion = "0.5.1"
## Arguments
## Validate integer inputs for deadlines
def check_positive(value):
"""
Check if the provided value is a positive integer.
Args:
value (str): The value to be checked, expected to be a string representation of an integer.
Returns:
int: The integer value if it is positive.
Raises:
argparse.ArgumentTypeError: If the value is not a positive integer.
"""
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return ivalue
## Validate input for target version
def check_version_arg(version):
"""
Validates the version argument to ensure it matches the expected format.
Args:
version (str): The version string to validate. It can be one of the following:
- "ANY"
- "MAJOR"
- "MINOR"
- A specific macOS version (e.g., "14.6.1" or "15.0")
Returns:
str: The validated version string in uppercase.
Raises:
argparse.ArgumentTypeError: If the version string does not match the expected format.
"""
# Regular expression pattern to match version argument
pattern = (
r"^any$" # Matches "any"
r"|^major$" # Matches "major"
r"|^minor$" # Matches "minor"
r"|^\d+\.\d+(?:\.\d+)?$" # Matches specific macOS version (e.g., "14.6.1" or "15.0")
)
if re.match(pattern, str(version), re.IGNORECASE):
return version.upper()
else:
raise argparse.ArgumentTypeError(
f'\nVersion definition must be one of the following:\n\n - ANY\n- MAJOR\n - MINOR\n - Specific macOS Version (e.g. "14.6.1" or "15.0")'
)
## Validate input for metadata file path
def check_path(datafile):
"""
Checks and resolves the path for the data file.
This function takes a path to a data file and determines if it is a directory or a file.
If it is a directory, it appends "updatePlanData.json" to the directory path.
If it is a file, it uses the provided file path.
If neither, it defaults to "updatePlanData.json" in the current working directory.
Args:
datafile (str): The path to the data file or directory.
Returns:
Path: The resolved path to the data file.
Raises:
argparse.ArgumentTypeError: If the path cannot be resolved or the file cannot be created.
"""
dataDir = Path(datafile).expanduser().resolve()
pathFail = False
if dataDir.is_dir():
filePath = dataDir.joinpath("updatePlanData.json")
elif Path(datafile).is_file():
filePath = Path(datafile)
else:
filePath = Path.cwd().joinpath("updatePlanData.json")
try:
filePath.touch(exist_ok=True)
except:
pathFail = True
if not filePath.exists() or pathFail:
raise argparse.ArgumentTypeError(
"Unable to parse data file path. Please try again or leave blank to use the default location (current working directory)"
)
else:
return filePath
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
argument_default=argparse.SUPPRESS,
)
parser.add_argument(
"--jamfurl",
nargs="?",
help="URL for the target jamf instance -- protocol prefix not required (ex: org.jamfcloud.com)",
)
parser.add_argument("--clientid", nargs="?", help="Jamf Pro API Client ID")
parser.add_argument("--clientsecret", nargs="?", help="Jamf Pro API Client Secret")
parser.add_argument(
"--check",
action="store_true",
help="Read existing plan data from file and update with the latest results",
)
parser.add_argument(
"--retry",
action="store_true",
help="After checking existing plan data, retry any failed plans. Use of this option implies --check.\n\nCAUTION: Retries will re-use existing installation deadlines. This could result in devices restarting for updates with little to no warning.\nRetries for exceeded installation deadlines will receive a new deadline of 3 days.",
)
parser.add_argument(
"--nvdtoken",
nargs="?",
metavar="token",
help="API key for NIST NVD. Not required, but providing one will enable faster CVE processing due to higher rate limits. NOTE: Using VulnCheck is strongly recommended over NVD due to ongoing issues with NIST update timeliness.",
)
parser.add_argument(
"--vulnchecktoken",
nargs="?",
metavar="token",
help="API key for VulnCheck (https://vulncheck.com)",
)
parser.add_argument(
"--feedsource",
default="https://sofafeed.macadmins.io/v1/macos_data_feed.json?",
const="https://sofafeed.macadmins.io/v1/macos_data_feed.json?",
nargs="?",
metavar="URL or path",
help="Full path or URL to a SOFA-generated macos_data_feed.json file. Defaults to https://sofafeed.macadmins.io/v1/macos_data_feed.json",
)
parser.add_argument(
"--timestamp",
default="https://sofafeed.macadmins.io/v1/timestamp.json?",
const="https://sofafeed.macadmins.io/v1/timestamp.json?",
nargs="?",
metavar="URL or path",
help="Full path or URL to a SOFA-generated timestamp.json file. Defaults to https://sofafeed.macadmins.io/v1/timestamp.json",
)
parser.add_argument(
"--targetversion",
default="ANY",
const="ANY",
nargs="?",
type=check_version_arg,
metavar="Version (string or type)",
help="""Target macOS version for deployment. Can be any of the following:
- Specific Version -- A specific macOS version to target for ALL eligible devices (e.g. 14.7.1) | Use --overridegroup and/or --excludegroup to target subsets of devices
- "ANY" (default) -- The latest and greatest Cupertino has to offer for ALL eligible devices
- "MAJOR" -- Target ONLY devices running the latest major version of macOS (e.g. updates devices on macOS 15 to the latest release of macOS 15)
- "MINOR" -- Target devices running the 2 latest major versions of macOS for their respective latest releases (e.g. 14.x to latest 14 and 15.x to latest 15)""",
)
parser.add_argument(
"--excludegroup",
nargs="+",
metavar="Excluded Group Name",
help="Name of a Smart/Static Computer Group containing devices to EXCLUDE from automated updates (such as conference room devices)",
)
parser.add_argument(
"--overridegroup",
nargs="+",
metavar="Override Group Name",
help="Name of a Smart/Static Computer Group to target for updates (overrides default outdated group)",
)
parser.add_argument(
"--canarygroup",
nargs="+",
metavar="Canary Group Name",
help='Name of a Smart/Static Computer Group containing devices to always receive a 2-day installation deadline.\n\nNOTE: Canary deployments are NOT currently compatible with --targetversion "MINOR".',
)
parser.add_argument(
"--canaryversion",
nargs="?",
metavar="macOS Version",
help="macOS ProductVersion deployed to canary group. Used to ensure the same version is deployed fleetwide.",
)
parser.add_argument(
"--canaryok",
action="store_true",
help="Deploy macOS update fleetwide, assuming successful canary deployment",
)
parser.add_argument(
"--canarydeadline",
default=2,
const=2,
nargs="?",
type=check_positive,
metavar="Days until deadline",
help="Number of days before deadline for the canary group",
)
parser.add_argument(
"--urgentdeadline",
default=7,
const=7,
nargs="?",
type=check_positive,
metavar="Days until deadline",
help="Force the update to all outdated devices with the specified deadline (in days), if the aggregate CVE scores warrant accelerated deployment",
)
parser.add_argument(
"--deadline",
default=14,
const=14,
nargs="?",
type=check_positive,
metavar="Days until deadline",
help="Force the update to all outdated devices with the specified deadline (in days)",
)
parser.add_argument(
"--force",
nargs="?",
type=check_positive,
metavar="Days until deadline",
help="Force the update to all outdated devices with the specified deadline (in days), overriding any configured canary data",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging for this script",
)
parser.add_argument(
"--dryrun",
action="store_true",
help="Output proposed actions without executing any changes",
)
parser.add_argument(
"--toggleddm",
action="store_true",
help="Toggle DDM functionality off and on for the specified jamf tenant, clearing ALL existing DDM data",
)
parser.add_argument(
"--datafile",
nargs="?",
type=check_path,
metavar="Path or filename",
help="Full path or filename for storing plan data",
)
parser.add_argument(
"--version",
action="version",
version=f"{scriptVersion}",
help="Show script version and exit",
)
args = parser.parse_args()
jamfURL = args.jamfurl if "jamfurl" in args else os.environ.get("jamfURL", None)
jamfClientID = (
args.clientid if "clientid" in args else os.environ.get("jamfClientID", None)
)
jamfClientSecret = (
args.clientsecret
if "clientsecret" in args
else os.environ.get("jamfClientSecret", None)
)
checkPlans = args.check if "check" in args else None
retryPlans = args.retry if "retry" in args else None
nvdToken = args.nvdtoken if "nvdtoken" in args else os.environ.get("nvdToken", None)
vulncheckToken = (
args.vulnchecktoken
if "vulnchecktoken" in args
else os.environ.get("vulncheckToken", None)
)
excludedGroupName = " ".join(args.excludegroup) if "excludegroup" in args else None
overrideGroupName = " ".join(args.overridegroup) if "overridegroup" in args else None
targetVersionType = args.targetversion.upper()
canaryGroupName = " ".join(args.canarygroup) if "canarygroup" in args else None
canaryVersion = args.canaryversion.replace('"', "") if "canaryversion" in args else None
canaryOK = args.canaryok if "canaryok" in args else False
canaryDays = args.canarydeadline
urgentDays = args.urgentdeadline
standardDays = args.deadline
customDeadline = True if "deadline" in args and args.deadline != 14 else False
forceDays = args.force if "force" in args else None
dataFilePath = (
Path(args.datafile)
if "datafile" in args
else Path.cwd().joinpath("updatePlanData.json")
)
debug = args.debug if "debug" in args else None
dryrun = args.dryrun if "dryrun" in args else False
toggleDDM = args.toggleddm if "toggleddm" in args else False
###############################
#### Logging configuration ####
###############################
## Local log file
logFile = NamedTemporaryFile(
prefix="jamf-ddm-deploy_",
suffix=f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.log",
delete=False,
dir=Path.cwd(),
).name
## Configure root logger
logger = logging.getLogger()
logger.handlers = []
## Create handlers
logToFile = logging.FileHandler(str(logFile))
jamfLogToFile = logging.FileHandler(str(logFile))
logToConsole = logging.StreamHandler(sys.stdout)
jamfLogToConsole = logging.StreamHandler(sys.stdout)
## Configure logging level and format
logLevel = logging.DEBUG if debug else logging.INFO
logFormat = logging.Formatter(
"[%(asctime)s %(filename)s->%(funcName)s():%(lineno)s]%(levelname)s: %(message)s"
if debug
else "%(asctime)s [%(levelname)s] %(message)s"
)
## Set root and handler logging levels
logger.setLevel(logLevel)
logToFile.setLevel(logLevel)
logToConsole.setLevel(logLevel)
## Set log format
logToFile.setFormatter(logFormat)
jamfLogToFile.setFormatter(logFormat)
logToConsole.setFormatter(logFormat)
jamfLogToConsole.setFormatter(logFormat)
## Configure jamf SDK logging
jamfLogger = logging.getLogger("jamf_pro_sdk")
jamfLogLevel = logging.DEBUG if debug else logging.WARNING
jamfLogger.setLevel(jamfLogLevel)
## Add handlers to jamf logger
jamfLogger.addHandler(jamfLogToFile)
jamfLogger.addHandler(jamfLogToConsole)
## Add handlers to root logger
logger.addHandler(logToFile)
logger.addHandler(logToConsole)
###############################
## Exit with a specified exit code, logging level, and final message
def endRun(exitCode=None, logLevel="info", message=None):
"""
Terminates the program with a specified exit code and logs a message.
Args:
exitCode (int, optional): The exit code to terminate the program with. Defaults to None.
logLevel (str, optional): The logging level for the message. Defaults to "info".
message (str, optional): The message to log. Defaults to None.
Raises:
SystemExit: Exits the program with the specified exit code.
"""
logCmd = getattr(logging, logLevel, "info")
logCmd = getattr(logging, logLevel, logging.info)
if message:
logCmd(message)
sys.exit(exitCode)
## Load feed file
if feedSource := args.feedsource:
logging.debug(f"Attempting to fetch macOS data feed from {feedSource}...")
try:
if feedSource.startswith("http://") or feedSource.startswith("https://"):
feedData = json.loads(requests.get(feedSource).content)
elif Path(feedSource).exists():
feedData = json.loads(Path(feedSource).read_text())
except json.JSONDecodeError as e:
endRun(1, "critical", f"Failed to decode JSON from {feedSource}: {e}")
except:
endRun(1, "critical", f"Failed to fetch feed data from {feedSource}, exiting!")
else:
endRun(1, "critical", "Unknown issue encountered fetching feed data, exiting...")
## Load timestamp data
if timestampSource := args.timestamp:
logging.debug(
f"Attempting to fetch SOFA feed timestamp data from {timestampSource}..."
)
try:
if timestampSource.startswith("http://") or timestampSource.startswith(
"https://"
):
timestampData = json.loads(requests.get(timestampSource).content)
elif Path(timestampSource).exists():
timestampData = json.loads(Path(timestampSource).read_text())
logging.debug("Successfully retrieved timestamp data")
except:
endRun(
1,
"critical",
f"Failed to fetch timestamp data from {timestampSource}, exiting!",
)
else:
endRun(
1, "critical", "Unknown issue encountered fetching timestamp data, exiting..."
)
## Load and return data from a json file
def loadJson(jsonPath):
"""
Parameters:
jsonPath (Path): The path to the JSON file to be loaded.
Returns:
dict: The data loaded from the JSON file.
"""
logging.debug(f"Loading json data from {str(jsonPath)}")
try:
jsonData = json.loads(jsonPath.read_text())
return jsonData
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from {str(jsonPath)}: {e}")
return None
## Dump data into a json file
def dumpJson(jsonData, jsonPath):
"""
Dump data into a JSON file.
Parameters:
jsonData (dict): The data to be written to the JSON file.
jsonPath (Path): The path to the JSON file where the data will be written.
"""
logging.debug(f"Dumping json data to {str(jsonPath)}")
logging.debug(f"json data sent: {jsonData}")
jsonPath.write_text(json.dumps(jsonData, indent=4, separators=(",", ": ")))
## TODO: Notify a Slack channel, Okta Workflow, or some other webhook when a deployment happens
def sendNotifications():
pass
## Make sure a device is supported for the targeted version before sending a declaration
def checkModelSupported(device):
"""
Checks if the given device model is supported for the target macOS version.
Args:
device (object): The device object containing information about the device, including its operating system and software update device ID.
Returns:
bool: True if the device model is supported for the target macOS version, False otherwise.
Logs:
A warning message if the device model is not supported for the target macOS version.
"""
global targetVersionSupportedDevices
swuDeviceID = device.operatingSystem.softwareUpdateDeviceId
if swuDeviceID not in targetVersionSupportedDevices:
logging.warning(
f"Device {device.id} does not support the target macOS version!"
)
return swuDeviceID in targetVersionSupportedDevices
## Query NVD or VulnCheck for details about a specific CVE
def getCVEDetails(vulnSource, cveID, requestHeaders):
"""
Retrieve details for a given CVE ID from the specified vulnerability source.
Args:
vulnSource (str): The source to check for CVE details. Valid values are "vulncheck" and "NVD".
cveID (str): The CVE ID to retrieve details for.
requestHeaders (dict): Headers to include in the request.
Returns:
dict: A dictionary containing CVE details including 'id', 'description', 'exploitabilityScore', and 'impactScore'.
None: If no CVE ID is provided, the CVE ID is invalid, or no results are found.
Raises:
None
Logs:
Various debug, error, and warning messages to indicate the progress and any issues encountered.
"""
if not cveID:
logging.error("No CVE ID provided, unable to get details")
return None
if not re.match(r"^CVE-\d{4}-\d+$", cveID, re.IGNORECASE):
logging.error(f"{cveID} does not appear to be a valid CVE ID!")
return None
if vulnSource == "vulncheck":
checkURL = "https://api.vulncheck.com/v3/index/nist-nvd2?cve="
else:
vulnSource = "NVD"
checkURL = "https://services.nvd.nist.gov/rest/json/cves/2.0?cveId="
logging.debug(f"Checking {vulnSource} for details on CVE {cveID}...")
cveCheckResponse = requests.get(checkURL + cveID, headers=requestHeaders)
if not cveCheckResponse.ok:
logging.error(
f"Error occured checking for CVE details. Received return code {cveCheckResponse.status_code}"
)
return None
else:
logging.debug("Successfully retrieved CVE data")
cveResponseContent = cveCheckResponse.json()
if (
cveResponseContent.get("totalResults") == 0
or vulnSource == "vulncheck"
and cveResponseContent.get("_meta").get("total_documents") == 0
):
logging.warning(f"No results found for CVE ID {cveID}")
return None
if vulnSource == "vulncheck":
cveData = cveResponseContent.get("data")[0]
else:
cveData = cveResponseContent.get("vulnerabilities")[0].get("cve")
if cveMetrics := cveData.get("metrics"):
cveMetricsData = cveMetrics.get(list(cveData.get("metrics").keys())[0])[0]
else:
cveMetricsData = {}
exploitabilityScore = cveMetricsData.get("exploitabilityScore", 0)
impactScore = cveMetricsData.get("impactScore", 0)
cveDetails = {
"id": cveID,
"description": next(
i.get("value") for i in cveData.get("descriptions") if i.get("lang") == "en"
),
"exploitabilityScore": exploitabilityScore,
"impactScore": impactScore,
}
logging.debug(f"CVE data: {cveDetails}")
return cveDetails
## Given a list of CVEs patched in a macOS release, determine whether or not the update deployment should be accelerated based on their impact scores
def parseVulns(cveList):
"""
Parses a list of CVEs and calculates the average exploitability and impact scores.
This function retrieves CVE details from either the NVD or VulnCheck API, calculates the
average exploitability and impact scores, and determines if the scores exceed predefined
risk thresholds.
Args:
cveList (list): A list of CVE identifiers to be processed.
Returns:
bool: True if the average scores exceed the risk thresholds, indicating a need for a
shorter installation deadline. False otherwise.
"""
logging.info("Calculating average CVE impact score for this release...")
cveCount = len(cveList)
totalExploitabilityScore = 0
totalImpactScore = 0
requestHeaders = {"accept": "application/json"}
vulnSource = "nvd"
if not any([nvdToken, vulncheckToken]):
logging.debug(
"No NVD or VulnCheck API tokens found, using NVD with public unauthenticated rate limits"
)
standoffTime = 7
elif vulncheckToken:
logging.debug("Using VulnCheck API token with no rate limits")
vulnSource = "vulncheck"
standoffTime = 0
requestHeaders.update({"Authorization": f"Bearer {vulncheckToken}"})
elif nvdToken:
logging.debug("Using NVD API token for higher rate limits")
standoffTime = 0.75
requestHeaders.update({"apiKey": nvdToken})
for cve in cveList:
exploitabilityScore = None
impactScore = None
if cveData := getCVEDetails(vulnSource, cve, requestHeaders):
exploitabilityScore = cveData.get("exploitabilityScore", None)
impactScore = cveData.get("impactScore", None)
if not cveData:
logging.warning(
f"No CVE metrics found for {cve}, excluding from average calculation"
)
cveCount -= 1
else:
logging.debug(
f"CVE {cve}: Exploitability = {exploitabilityScore}, Impact = {impactScore}"
)
totalExploitabilityScore += exploitabilityScore
totalImpactScore += impactScore
if standoffTime > 0:
logging.debug(f"Waiting {standoffTime} seconds before next request...")
time.sleep(standoffTime)
if cveCount > 0:
averageExploitabilityScore = round(totalExploitabilityScore / cveCount, 1)
if cveCount > 0:
averageExploitabilityScore = round(totalExploitabilityScore / cveCount, 1)
averageImpactScore = round(totalImpactScore / cveCount, 1)
else:
averageExploitabilityScore = 0
averageImpactScore = 0
logging.debug(f"Average exploitability score: {averageExploitabilityScore}")
logging.debug(f"Average impact score: {averageImpactScore}")
if averageExploitabilityScore > 6:
logging.info(
f"Average exploitability score {averageExploitabilityScore} exceeds the threshold of 6--forcing shorter installation deadline!"
)
return True
elif averageImpactScore > 8:
logging.info(
f"Average impact score {averageImpactScore} exceeds the threshold of 8--forcing shorter installation deadline!"
)
return True
else:
logging.info(
"Average exploitability and impact scores are within normal risk ranges. No accelerated deadline required."
)
return False
else:
logging.debug(f"Average exploitability score: {averageExploitabilityScore}")
logging.debug(f"Average impact score: {averageImpactScore}")
EXPLOITABILITY_THRESHOLD = 6
IMPACT_THRESHOLD = 8
if (
averageExploitabilityScore > EXPLOITABILITY_THRESHOLD
or averageImpactScore > IMPACT_THRESHOLD
):
logging.info(
f"Average scores have tripped the risk threshold--forcing shorter installation deadline!"
)
return True
else:
logging.info(
"Average exploitability and impact scores are within normal risk ranges. No accelerated deadline required."
)
return False
## Convert a millisecond epoch timestamp received from the jamf api to a datetime object
def convertJamfTimestamp(timestamp):
"""
Converts a Jamf timestamp (in milliseconds) to a dictionary containing the epoch time (in seconds)
and a datetime object.
Args:
timestamp (int): The Jamf timestamp in milliseconds.
Returns:
dict: A dictionary with the following keys:
- "epochTime" (int): The epoch time in seconds.
- "datetime" (datetime): The corresponding datetime object.
Raises:
ValueError: If the timestamp is missing or malformed.
"""
if not timestamp or not check_positive(str(timestamp)):
logging.error("Timestamp missing or malformed--cannot convert!")
return None
timestampSeconds = round(timestamp / 1000)
logging.debug(
f"Converted timestamp {timestamp} to epoch {timestampSeconds}"
)
return timestampSeconds
## Given an integer, calculate an installation deadline and return it in a format acceptable to the jamf API
def calculateDeadlineString(deadlineDays):
"""
Calculate the installation deadline string based on the given number of days.
This function calculates a deadline date by adding the specified number of days
to the current date. If the calculated deadline falls on a weekend (Saturday or Sunday),
it adjusts the deadline to the following Monday. The final deadline is formatted
as an ISO 8601 string with a fixed time of 19:00:00.
Args:
deadlineDays (int): The number of days from the current date to set the deadline.
If the value is missing or not positive, a default of 7 days is used.
Returns:
str: The calculated deadline date as an ISO 8601 formatted string.
"""
if not deadlineDays or not check_positive(str(deadlineDays)):
logging.error("Deadline missing or malformed--defaulting to 7 days")
deadlineDays = 7
deadlineDate = datetime.now() + timedelta(days=deadlineDays)
if deadlineDate.isoweekday() in {6, 7}:
logging.info(
"Configured deadline falls on a weekend--moving to the following Monday"
)
deadlineDate += timedelta(days=8 - deadlineDate.isoweekday())
installDeadlineString = deadlineDate.strftime("%Y-%m-%dT19:00:00")
return installDeadlineString
## Check if there are any active DDM update plans for a given device
## If one is found, skip sending a new declaration, because it will fail
def checkExistingDevicePlans(deviceId, objectType="COMPUTER"):
"""
Checks for existing active (non-failed) plans for a given device.
Args:
deviceId (str): The ID of the device to check for existing plans.
objectType (str, optional): The type of the object. Defaults to "COMPUTER".
Returns:
dict or None: If no existing active plan is found, returns a dictionary with the deviceId and objectType.
If an existing active plan is found, returns None.
Logs:
Logs warnings if no deviceId is provided.
Logs debug information about the attempts to check for existing plans and the results.
"""
global existingPlanCount
global existingPlans
existingActivePlan = False
if not deviceId:
logging.warning(
"No device ID found when attempting to check for existing plans!"
)
return None
for i in range(1, 6):
logging.debug(
f"Checking for existing active (non-failed) plans for device {deviceId} (attempt {i} of 5)..."
)
existingActivePlan = False
existingPlansResponse = jamfClient.pro_api_request(
method="GET",
resource_path=f"v1/managed-software-updates/plans?filter=device.deviceId=={deviceId}",
)
if existingPlansResponse.ok:
devicePlanRecords = [
getPlanData(x.get("planUuid"))
for x in existingPlansResponse.json().get("results")
]
existingActivePlanData = next(
(plan for plan in devicePlanRecords if not plan.get("planFailed")),
None,
)
existingActivePlan = bool(existingActivePlanData)
logging.debug(f"Existing active plan found: {existingActivePlan}")
break
else:
logging.debug(
f"Received {existingPlansResponse.status_code} checking for plan events, trying again..."
)
if not existingActivePlan:
declarationItem = {"deviceId": deviceId, "objectType": objectType}
returnData = {"isEligible": True, "item": declarationItem}
logging.debug(f"No existing plan found, returning device data: {returnData}")
return returnData
else:
existingPlanCount += 1
existingActivePlan = {
"planId": existingActivePlanData.get("planUuid"),
"device": existingActivePlanData.get("deviceData"),
}
returnData = {"isEligible": False, "item": existingActivePlan}
logging.debug(f"Existing plan found, returning device data: {returnData}")
existingPlans.append(existingActivePlan)
return returnData
## Send DDM update plans to a device, a list of devices, or a group
def sendDeclaration(objectType, objectIds, installDeadlineString, osVersion):
"""
Sends a declaration for macOS updates to specified devices or groups.
Parameters:
objectType (str): The type of object to target, either "computer" or "group".
objectIds (int or list): The ID(s) of the target devices or group.
installDeadlineString (str): The deadline for the installation in ISO 8601 format.
osVersion (str): The version of macOS to update to.
Returns:
list or None: A list of plans if the declaration is successful, None otherwise.
"""
global existingPlans
if not re.match(r"^computer$|^group$", objectType, re.IGNORECASE):
logging.error(
f'Expected object type of "computer" or "group", received {objectType}'
)
return None
objectType = objectType.upper()
endpoint = "v1/managed-software-updates/plans"
targetDeviceList = []