13
13
See the License for the specific language governing permissions and
14
14
limitations under the License.
15
15
"""
16
+
16
17
import logging
17
18
import os
18
19
from typing import Dict , List , Optional
24
25
logger = logging .getLogger (__name__ )
25
26
26
27
28
+ class RadasConfig (object ):
29
+ def __init__ (self , data : Dict ):
30
+ self .__umb_host : str = data .get ("umb_host" , None )
31
+ self .__umb_host_port : str = data .get ("umb_host_port" , "5671" )
32
+ self .__result_queue : str = data .get ("result_queue" , None )
33
+ self .__request_chan : str = data .get ("request_channel" , None )
34
+ self .__client_ca : str = data .get ("client_ca" , None )
35
+ self .__client_key : str = data .get ("client_key" , None )
36
+ self .__client_key_pass_file : str = data .get ("client_key_pass_file" , None )
37
+ self .__root_ca : str = data .get ("root_ca" , "/etc/pki/tls/certs/ca-bundle.crt" )
38
+ self .__quay_radas_registry_config : Optional [str ] = data .get (
39
+ "quay_radas_registry_config" , None
40
+ )
41
+ self .__radas_sign_timeout_retry_count : int = data .get ("radas_sign_timeout_retry_count" , 10 )
42
+ self .__radas_sign_timeout_retry_interval : int = data .get (
43
+ "radas_sign_timeout_retry_interval" , 60
44
+ )
45
+ self .__radas_receiver_timeout : int = int (data .get ("radas_receiver_timeout" , 1800 ))
46
+
47
+ def validate (self ) -> bool :
48
+ if not self .__umb_host :
49
+ logger .error ("Missing host name setting for UMB!" )
50
+ return False
51
+ if not self .__result_queue :
52
+ logger .error ("Missing the queue setting to receive signing result in UMB!" )
53
+ return False
54
+ if not self .__request_chan :
55
+ logger .error ("Missing the queue setting to send signing request in UMB!" )
56
+ return False
57
+ if self .__client_ca and not os .access (self .__client_ca , os .R_OK ):
58
+ logger .error ("The client CA file is not valid!" )
59
+ return False
60
+ if self .__client_key and not os .access (self .__client_key , os .R_OK ):
61
+ logger .error ("The client key file is not valid!" )
62
+ return False
63
+ if self .__client_key_pass_file and not os .access (self .__client_key_pass_file , os .R_OK ):
64
+ logger .error ("The client key password file is not valid!" )
65
+ return False
66
+ if self .__root_ca and not os .access (self .__root_ca , os .R_OK ):
67
+ logger .error ("The root ca file is not valid!" )
68
+ return False
69
+ if self .__quay_radas_registry_config and not os .access (
70
+ self .__quay_radas_registry_config , os .R_OK
71
+ ):
72
+ self .__quay_radas_registry_config = None
73
+ logger .warning (
74
+ "The quay registry config for oras is not valid, will ignore the registry config!"
75
+ )
76
+ return True
77
+
78
+ def umb_target (self ) -> str :
79
+ if self .ssl_enabled ():
80
+ return f"amqps://{ self .__umb_host .strip ()} :{ self .__umb_host_port } "
81
+ else :
82
+ return f"amqp://{ self .__umb_host .strip ()} :{ self .__umb_host_port } "
83
+
84
+ def result_queue (self ) -> str :
85
+ return self .__result_queue .strip ()
86
+
87
+ def request_channel (self ) -> str :
88
+ return self .__request_chan .strip ()
89
+
90
+ def client_ca (self ) -> str :
91
+ return self .__client_ca .strip ()
92
+
93
+ def client_key (self ) -> str :
94
+ return self .__client_key .strip ()
95
+
96
+ def client_key_password (self ) -> str :
97
+ pass_file = self .__client_key_pass_file
98
+ if os .access (pass_file , os .R_OK ):
99
+ with open (pass_file , "r" ) as f :
100
+ return f .read ().strip ()
101
+ elif pass_file :
102
+ logger .warning ("The key password file is not accessible. Will ignore the password." )
103
+ return ""
104
+
105
+ def root_ca (self ) -> str :
106
+ return self .__root_ca .strip ()
107
+
108
+ def ssl_enabled (self ) -> bool :
109
+ return bool (self .__client_ca and self .__client_key and self .__root_ca )
110
+
111
+ def quay_radas_registry_config (self ) -> Optional [str ]:
112
+ if self .__quay_radas_registry_config :
113
+ return self .__quay_radas_registry_config .strip ()
114
+ return None
115
+
116
+ def radas_sign_timeout_retry_count (self ) -> int :
117
+ return self .__radas_sign_timeout_retry_count
118
+
119
+ def radas_sign_timeout_retry_interval (self ) -> int :
120
+ return self .__radas_sign_timeout_retry_interval
121
+
122
+ def receiver_timeout (self ) -> int :
123
+ return self .__radas_receiver_timeout
124
+
125
+
27
126
class CharonConfig (object ):
28
127
"""CharonConfig is used to store all configurations for charon
29
128
tools.
@@ -39,6 +138,13 @@ def __init__(self, data: Dict):
39
138
self .__ignore_signature_suffix : Dict = data .get ("ignore_signature_suffix" , None )
40
139
self .__signature_command : str = data .get ("detach_signature_command" , None )
41
140
self .__aws_cf_enable : bool = data .get ("aws_cf_enable" , False )
141
+ radas_config : Dict = data .get ("radas" , None )
142
+ self .__radas_config : Optional [RadasConfig ] = None
143
+ if radas_config :
144
+ self .__radas_config = RadasConfig (radas_config )
145
+ self .__radas_enabled = bool (self .__radas_config and self .__radas_config .validate ())
146
+ else :
147
+ self .__radas_enabled = False
42
148
43
149
def get_ignore_patterns (self ) -> List [str ]:
44
150
return self .__ignore_patterns
@@ -67,19 +173,23 @@ def get_detach_signature_command(self) -> str:
67
173
def is_aws_cf_enable (self ) -> bool :
68
174
return self .__aws_cf_enable
69
175
176
+ def is_radas_enabled (self ) -> bool :
177
+ return self .__radas_enabled
178
+
179
+ def get_radas_config (self ) -> Optional [RadasConfig ]:
180
+ return self .__radas_config
181
+
70
182
71
183
def get_config (cfgPath = None ) -> CharonConfig :
72
184
config_file_path = cfgPath
73
185
if not config_file_path or not os .path .isfile (config_file_path ):
74
186
config_file_path = os .path .join (os .getenv ("HOME" , "" ), ".charon" , CONFIG_FILE )
75
- data = read_yaml_from_file_path (config_file_path , ' schemas/charon.json' )
187
+ data = read_yaml_from_file_path (config_file_path , " schemas/charon.json" )
76
188
return CharonConfig (data )
77
189
78
190
79
191
def get_template (template_file : str ) -> str :
80
- template = os .path .join (
81
- os .getenv ("HOME" , '' ), ".charon/template" , template_file
82
- )
192
+ template = os .path .join (os .getenv ("HOME" , "" ), ".charon/template" , template_file )
83
193
if os .path .isfile (template ):
84
194
with open (template , encoding = "utf-8" ) as file_ :
85
195
return file_ .read ()
0 commit comments