13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.
15
15
#
16
+ import os
16
17
import base64
17
18
import json
18
19
import logging
30
31
from confluent_kafka import Producer
31
32
32
33
logger = logging .getLogger (__name__ )
33
- logger .setLevel (logging .INFO )
34
+ log_level = os .environ .get ('LOG_LEVEL' , 'INFO' )
35
+ logger .setLevel (log_level )
34
36
logger .addHandler (logging .StreamHandler ())
35
37
36
38
with open ("conf/api.yaml" , "r" ) as file :
68
70
else :
69
71
EVENT_BUS_ARN = ""
70
72
71
- logger .info ("Loaded configs" )
73
+ logger .debug ("Loaded configs" )
72
74
73
75
token_public_key_encoded = requests .get (CONFIG ["token_public_key_url" ], verify = False ).json ()["key" ]
74
76
TOKEN_PUBLIC_KEY = serialization .load_der_public_key (base64 .b64decode (token_public_key_encoded ))
75
- logger .info ("Loaded token public key" )
77
+ logger .debug ("Loaded token public key" )
76
78
77
79
producer_config = {"bootstrap.servers" : CONFIG ["kafka_bootstrap_server" ]}
78
80
if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG :
87
89
"ssl.key.location" : CONFIG ["kafka_ssl_key_path" ],
88
90
"ssl.key.password" : CONFIG ["kafka_ssl_key_password" ]
89
91
})
90
- logger .info ("producer will use SASL_SSL" )
92
+ logger .debug ("producer will use SASL_SSL" )
91
93
92
94
kafka_producer = Producer (producer_config )
93
- logger .info ("Initialized kafka producer" )
95
+ logger .debug ("Initialized kafka producer" )
94
96
95
97
def kafka_write (topicName , message ):
96
- logger .info (f"Sending to kafka { topicName } " )
98
+ logger .debug (f"Sending to kafka { topicName } " )
97
99
error = []
98
100
kafka_producer .produce (topicName ,
99
101
key = "" ,
@@ -105,10 +107,10 @@ def kafka_write(topicName, message):
105
107
106
108
def event_bridge_write (topicName , message ):
107
109
if not EVENT_BUS_ARN :
108
- logger .info ("No EventBus Arn - skipping" )
110
+ logger .debug ("No EventBus Arn - skipping" )
109
111
return
110
112
111
- logger .info (f"Sending to eventBridge { topicName } " )
113
+ logger .debug (f"Sending to eventBridge { topicName } " )
112
114
response = aws_eventbridge .put_events (
113
115
Entries = [
114
116
{
@@ -129,22 +131,22 @@ def get_api():
129
131
}
130
132
131
133
def get_token ():
132
- logger .info ("Handling GET Token" )
134
+ logger .debug ("Handling GET Token" )
133
135
return {
134
136
"statusCode" : 303 ,
135
137
"headers" : {"Location" : TOKEN_PROVIDER_URL }
136
138
}
137
139
138
140
def get_topics ():
139
- logger .info ("Handling GET Topics" )
141
+ logger .debug ("Handling GET Topics" )
140
142
return {
141
143
"statusCode" : 200 ,
142
144
"headers" : {"Content-Type" : "application/json" },
143
145
"body" : json .dumps ([topicName for topicName in TOPICS ])
144
146
}
145
147
146
148
def get_topic_schema (topicName ):
147
- logger .info (f"Handling GET TopicSchema({ topicName } )" )
149
+ logger .debug (f"Handling GET TopicSchema({ topicName } )" )
148
150
if topicName not in TOPICS :
149
151
return { "statusCode" : 404 }
150
152
@@ -155,7 +157,7 @@ def get_topic_schema(topicName):
155
157
}
156
158
157
159
def post_topic_message (topicName , topicMessage , tokenEncoded ):
158
- logger .info (f"Handling POST { topicName } " )
160
+ logger .debug (f"Handling POST { topicName } " )
159
161
try :
160
162
token = jwt .decode (tokenEncoded , TOKEN_PUBLIC_KEY , algorithms = ["RS256" ])
161
163
except Exception as e :
0 commit comments