13
13
import re
14
14
import time
15
15
import warnings
16
+ from collections .abc import Callable
16
17
from datetime import timedelta
18
+ from typing import Any , Optional , Union
17
19
18
20
import jmespath
19
21
20
22
21
- def deprecated (message = "" ):
23
+ def deprecated (message : str = "" ) -> Callable [[ Callable [..., Any ]], Callable [..., Any ]] :
22
24
"""
23
25
Define a deprecated decorator without dependencies on 3rd party libraries
24
26
@@ -27,9 +29,9 @@ def deprecated(message=""):
27
29
"""
28
30
29
31
# create closure around function that follows use of the decorator
30
- def deprecated_decorator (func ) :
32
+ def deprecated_decorator (func : Callable [..., Any ]) -> Callable [..., Any ] :
31
33
@functools .wraps (func )
32
- def deprecated_func (* args , ** kwargs ) :
34
+ def deprecated_func (* args : object , ** kwargs : object ) -> object :
33
35
warnings .warn (f"`{ func .__name__ } ` is a deprecated function or method. \n { message } " ,
34
36
category = DeprecationWarning , stacklevel = 1 )
35
37
warnings .simplefilter ("default" , DeprecationWarning )
@@ -47,21 +49,21 @@ class DataGenError(Exception):
47
49
:param baseException: underlying exception, if any that caused the issue
48
50
"""
49
51
50
- def __init__ (self , msg , baseException = None ):
52
+ def __init__ (self : "DataGenError" , msg : str , baseException : Optional [ Exception ] = None ) -> None :
51
53
""" constructor
52
54
"""
53
55
super ().__init__ (msg )
54
- self ._underlyingException = baseException
55
- self ._msg = msg
56
+ self ._underlyingException : Optional [ Exception ] = baseException
57
+ self ._msg : str = msg
56
58
57
- def __repr__ (self ) :
59
+ def __repr__ (self : "DataGenError" ) -> str :
58
60
return f"DataGenError(msg='{ self ._msg } ', baseException={ self ._underlyingException } )"
59
61
60
- def __str__ (self ) :
62
+ def __str__ (self : "DataGenError" ) -> str :
61
63
return f"DataGenError(msg='{ self ._msg } ', baseException={ self ._underlyingException } )"
62
64
63
65
64
- def coalesce_values (* args ) :
66
+ def coalesce_values (* args : object ) -> Optional [ object ] :
65
67
"""For a supplied list of arguments, returns the first argument that does not have the value `None`
66
68
67
69
:param args: variable list of arguments which are evaluated
@@ -73,7 +75,7 @@ def coalesce_values(*args):
73
75
return None
74
76
75
77
76
- def ensure (cond , msg = "condition does not hold true" ):
78
+ def ensure (cond : bool , msg : str = "condition does not hold true" ) -> None :
77
79
"""ensure(cond, s) => throws Exception(s) if c is not true
78
80
79
81
:param cond: condition to test
@@ -82,14 +84,14 @@ def ensure(cond, msg="condition does not hold true"):
82
84
:returns: Does not return anything but raises exception if condition does not hold
83
85
"""
84
86
85
- def strip_margin (text ) :
87
+ def strip_margin (text : str ) -> str :
86
88
return re .sub (r"\n[ \t]*\|" , "\n " , text )
87
89
88
90
if not cond :
89
91
raise DataGenError (strip_margin (msg ))
90
92
91
93
92
- def mkBoundsList (x , default ) :
94
+ def mkBoundsList (x : Optional [ Union [ int , list [ int ]]], default : Union [ int , list [ int ]]) -> tuple [ bool , list [ int ]] :
93
95
""" make a bounds list from supplied parameter - otherwise use default
94
96
95
97
:param x: integer or list of 2 values that define bounds list
@@ -100,16 +102,20 @@ def mkBoundsList(x, default):
100
102
retval = (True , [default , default ]) if isinstance (default , int ) else (True , list (default ))
101
103
return retval
102
104
elif isinstance (x , int ):
103
- bounds_list = [x , x ]
105
+ bounds_list : list [ int ] = [x , x ]
104
106
assert len (bounds_list ) == 2 , "bounds list must be of length 2"
105
107
return False , bounds_list
106
108
else :
107
- bounds_list = list (x )
109
+ bounds_list : list [ int ] = list (x )
108
110
assert len (bounds_list ) == 2 , "bounds list must be of length 2"
109
111
return False , bounds_list
110
112
111
113
112
- def topologicalSort (sources , initial_columns = None , flatten = True ):
114
+ def topologicalSort (
115
+ sources : list [tuple [str , set [str ]]],
116
+ initial_columns : Optional [list [str ]] = None ,
117
+ flatten : bool = True
118
+ ) -> Union [list [str ], list [list [str ]]]:
113
119
""" Perform a topological sort over sources
114
120
115
121
Used to compute the column test data generation order of the column generation dependencies.
@@ -129,16 +135,16 @@ def topologicalSort(sources, initial_columns=None, flatten=True):
129
135
Overall the effect is that the input build order should be retained unless there are forward references
130
136
"""
131
137
# generate a copy so that we can modify in place
132
- pending = [(name , set (deps )) for name , deps in sources ]
133
- provided = [] if initial_columns is None else initial_columns [:]
134
- build_orders = [] if initial_columns is None else [initial_columns ]
138
+ pending : list [ tuple [ str , set [ str ]]] = [(name , set (deps )) for name , deps in sources ]
139
+ provided : list [ str ] = [] if initial_columns is None else initial_columns [:]
140
+ build_orders : list [ list [ str ]] = [] if initial_columns is None else [initial_columns ]
135
141
136
142
while pending :
137
- next_pending = []
138
- gen = []
139
- value_emitted = False
140
- defer_emitted = False
141
- gen_provided = []
143
+ next_pending : list [ tuple [ str , set [ str ]]] = []
144
+ gen : list [ str ] = []
145
+ value_emitted : bool = False
146
+ defer_emitted : bool = False
147
+ gen_provided : list [ str ] = []
142
148
for entry in pending :
143
149
name , deps = entry
144
150
deps .difference_update (provided )
@@ -165,7 +171,7 @@ def topologicalSort(sources, initial_columns=None, flatten=True):
165
171
pending = next_pending
166
172
167
173
if flatten :
168
- flattened_list = [item for sublist in build_orders for item in sublist ]
174
+ flattened_list : list [ str ] = [item for sublist in build_orders for item in sublist ]
169
175
return flattened_list
170
176
else :
171
177
return build_orders
@@ -176,31 +182,31 @@ def topologicalSort(sources, initial_columns=None, flatten=True):
176
182
_WEEKS_PER_YEAR = 52
177
183
178
184
179
- def parse_time_interval (spec ) :
185
+ def parse_time_interval (spec : str ) -> timedelta :
180
186
"""parse time interval from string"""
181
- hours = 0
182
- minutes = 0
183
- weeks = 0
184
- microseconds = 0
185
- milliseconds = 0
186
- seconds = 0
187
- years = 0
188
- days = 0
187
+ hours : int = 0
188
+ minutes : int = 0
189
+ weeks : int = 0
190
+ microseconds : int = 0
191
+ milliseconds : int = 0
192
+ seconds : int = 0
193
+ years : int = 0
194
+ days : int = 0
189
195
190
196
assert spec is not None , "Must have valid time interval specification"
191
197
192
198
# get time specs such as 12 days, etc. Supported timespans are years, days, hours, minutes, seconds
193
- timespecs = [x .strip () for x in spec .strip ().split ("," )]
199
+ timespecs : list [ str ] = [x .strip () for x in spec .strip ().split ("," )]
194
200
195
201
for ts in timespecs :
196
202
# allow both 'days=1' and '1 day' syntax
197
- timespec_parts = re .findall (PATTERN_NAME_EQUALS_VALUE , ts )
203
+ timespec_parts : list [ tuple [ str , str ]] = re .findall (PATTERN_NAME_EQUALS_VALUE , ts )
198
204
# findall returns list of tuples
199
205
if timespec_parts is not None and len (timespec_parts ) > 0 :
200
- num_parts = len (timespec_parts [0 ])
206
+ num_parts : int = len (timespec_parts [0 ])
201
207
assert num_parts >= 1 , "must have numeric specification and time element such as `12 hours` or `hours=12`"
202
- time_value = int (timespec_parts [0 ][num_parts - 1 ])
203
- time_type = timespec_parts [0 ][0 ].lower ()
208
+ time_value : int = int (timespec_parts [0 ][num_parts - 1 ])
209
+ time_type : str = timespec_parts [0 ][0 ].lower ()
204
210
else :
205
211
timespec_parts = re .findall (PATTERN_VALUE_SPACE_NAME , ts )
206
212
num_parts = len (timespec_parts [0 ])
@@ -225,7 +231,7 @@ def parse_time_interval(spec):
225
231
elif time_type in ["milliseconds" , "millisecond" ]:
226
232
milliseconds = time_value
227
233
228
- delta = timedelta (
234
+ delta : timedelta = timedelta (
229
235
days = days ,
230
236
seconds = seconds ,
231
237
microseconds = microseconds ,
@@ -238,7 +244,7 @@ def parse_time_interval(spec):
238
244
return delta
239
245
240
246
241
- def strip_margins (s , marginChar ) :
247
+ def strip_margins (s : str , marginChar : str ) -> str :
242
248
"""
243
249
Python equivalent of Scala stripMargins method
244
250
Takes a string (potentially multiline) and strips all chars up and including the first occurrence of `marginChar`.
@@ -258,20 +264,20 @@ def strip_margins(s, marginChar):
258
264
assert s is not None and isinstance (s , str )
259
265
assert marginChar is not None and isinstance (marginChar , str )
260
266
261
- lines = s .split ("\n " )
262
- revised_lines = []
267
+ lines : list [ str ] = s .split ("\n " )
268
+ revised_lines : list [ str ] = []
263
269
264
270
for line in lines :
265
271
if marginChar in line :
266
- revised_line = line [line .index (marginChar ) + 1 :]
272
+ revised_line : str = line [line .index (marginChar ) + 1 :]
267
273
revised_lines .append (revised_line )
268
274
else :
269
275
revised_lines .append (line )
270
276
271
277
return "\n " .join (revised_lines )
272
278
273
279
274
- def split_list_matching_condition (lst , cond ) :
280
+ def split_list_matching_condition (lst : list [ Any ] , cond : Callable [[ Any ], bool ]) -> list [ list [ Any ]] :
275
281
"""
276
282
Split a list on elements that match a condition
277
283
@@ -293,9 +299,9 @@ def split_list_matching_condition(lst, cond):
293
299
:arg cond: lambda function or function taking single argument and returning True or False
294
300
:returns: list of sublists
295
301
"""
296
- retval = []
302
+ retval : list [ list [ Any ]] = []
297
303
298
- def match_condition (matchList , matchFn ) :
304
+ def match_condition (matchList : list [ Any ] , matchFn : Callable [[ Any ], bool ]) -> int :
299
305
"""Return first index of element of list matching condition"""
300
306
if matchList is None or len (matchList ) == 0 :
301
307
return - 1
@@ -311,7 +317,7 @@ def match_condition(matchList, matchFn):
311
317
elif len (lst ) == 1 :
312
318
retval = [lst ]
313
319
else :
314
- ix = match_condition (lst , cond )
320
+ ix : int = match_condition (lst , cond )
315
321
if ix != - 1 :
316
322
retval .extend (split_list_matching_condition (lst [0 :ix ], cond ))
317
323
retval .append (lst [ix :ix + 1 ])
@@ -323,7 +329,7 @@ def match_condition(matchList, matchFn):
323
329
return [el for el in retval if el != []]
324
330
325
331
326
- def json_value_from_path (searchPath , jsonData , defaultValue ) :
332
+ def json_value_from_path (searchPath : str , jsonData : str , defaultValue : object ) -> object :
327
333
""" Get JSON value from JSON data referenced by searchPath
328
334
329
335
searchPath should be a JSON path as supported by the `jmespath` package
@@ -337,20 +343,20 @@ def json_value_from_path(searchPath, jsonData, defaultValue):
337
343
assert searchPath is not None and len (searchPath ) > 0 , "search path cannot be empty"
338
344
assert jsonData is not None and len (jsonData ) > 0 , "JSON data cannot be empty"
339
345
340
- jsonDict = json .loads (jsonData )
346
+ jsonDict : dict = json .loads (jsonData )
341
347
342
- jsonValue = jmespath .search (searchPath , jsonDict )
348
+ jsonValue : Any = jmespath .search (searchPath , jsonDict )
343
349
344
350
if jsonValue is not None :
345
351
return jsonValue
346
352
347
353
return defaultValue
348
354
349
355
350
- def system_time_millis ():
356
+ def system_time_millis () -> int :
351
357
""" return system time as milliseconds since start of epoch
352
358
353
359
:return: system time millis as long
354
360
"""
355
- curr_time = round (time .time () / 1000 )
361
+ curr_time : int = round (time .time () / 1000 )
356
362
return curr_time
0 commit comments