@@ -68,33 +68,12 @@ def patch(dataset, custom_fields={}, athena=None):
6868 '''
6969 def _get_athena_columns (table , database = None ):
7070 '''returns athena columns'''
71+ metadata = athena .get_table_metadata (table , database_name = database , no_cache = True )
7172 return [
72- line [0 ].split (None , 1 )
73- for line in athena .query (
74- f'SHOW COLUMNS FROM { table } ' ,
75- include_header = True , database = database
76- )
73+ (col ['Name' ], col ['Type' ])
74+ for col in metadata .get ('Columns' , [])
7775 ]
7876
79- def _replace_columns (existing_columns , new_columns ):
80- '''replace columns but keep the order, ignore case changes
81- assert _replace_columns(
82- [{'Name': 'a'}, {'Name': 'B'}, {'Name': 'c'}],
83- [{'Name': 'A'}, {'Name': 'b'}, {'Name': 'd'}]) \
84- == [{'Name': 'a'}, {'Name': 'B'}, {'Name': 'd'}]
85- Different types will be replaced
86- '''
87- existing_columns = [
88- existing_col
89- for existing_col in existing_columns
90- if str (existing_col ).lower () in [str (c ).lower () for c in new_columns ]
91- ] # filter out old
92- for col in new_columns : # add new
93- if col ['Name' ].lower () not in [c ['Name' ].lower () for c in existing_columns ]:
94- existing_columns .append (col )
95- #REFACTOR: what if col is there but another type?
96- return existing_columns
97-
9877 def _athena_to_qs_type (col , athena_type ):
9978 '''map athena type to QS type
10079 The following data types are supported in SPICE: Date, Decimal-fixed, Decimal-float, Integer, and String.
@@ -125,8 +104,14 @@ def _athena_to_qs_type(col, athena_type):
125104 root_lt = dataset ['LogicalTableMap' ][lt ]
126105 break
127106 else :
128- raise ValueError (f'Unable to find a root logical table in the dataset { dataset } ' )
129- projected_cols = next (ds ['ProjectOperation' ]["ProjectedColumns" ] for ds in root_lt ['DataTransforms' ] if 'ProjectOperation' in ds )
107+ # take the first one and let's hope it is fine
108+ root_lt = next (iter (dataset ['LogicalTableMap' ].values ()))
109+
110+ projected_cols = next ( # get the first DataTrasform with ProjectOperation
111+ ds ['ProjectOperation' ]["ProjectedColumns" ]
112+ for ds in root_lt ['DataTransforms' ]
113+ if 'ProjectOperation' in ds
114+ )
130115
131116 # Update each PhysicalTableMap with all columns from athena views
132117 all_columns = []
@@ -135,12 +120,31 @@ def _athena_to_qs_type(col, athena_type):
135120 database = pt ['RelationalTable' ]['Schema' ]
136121 columns = _get_athena_columns (table_name , database )
137122 logger .trace (f'columns = { columns } ' )
138-
139- new_columns = [_athena_to_qs_type (name , athena_type ) for name , athena_type in columns ]
140- #for col in new_columns:
141- # if col['Name'] in [existing_col['Name'] for existing_col in all_columns]: #FIXME not all_columns so far but must be all cols before modification
142- # col['Name'] = f'{col["Name"]}[{table_name}]'
143- pt ['RelationalTable' ]['InputColumns' ] = _replace_columns (pt ['RelationalTable' ]['InputColumns' ], new_columns )
123+ athena_columns = [
124+ _athena_to_qs_type (name , athena_type .lower ())
125+ for name , athena_type in columns
126+ ]
127+ logger .trace (f'athena_columns = { columns } ' )
128+ athena_columns_names = [c ['Name' ].lower () for c in athena_columns ]
129+ dataset_columns = pt ['RelationalTable' ]['InputColumns' ]
130+ dataset_columns_names = [col ['Name' ].lower () for col in dataset_columns ]
131+ dataset_columns_to_keep = [
132+ col for col in dataset_columns
133+ if col ['Name' ].lower () in athena_columns_names
134+ ]
135+ new_columns = [
136+ col for col in athena_columns
137+ if col ['Name' ].lower () not in dataset_columns_names
138+ ] # BTW what if col is there but another type?
139+
140+ for col in new_columns : # alter names for columns that already exist
141+ if col ['Name' ].lower () in projected_cols :
142+ col ['Name' ] = f"{ col ['Name' ]} [{ table_name } ]" # What if it is alrady there?
143+
144+ logger .trace (f'dataset_columns_to_keep = { dataset_columns_to_keep } ' )
145+ if new_columns :
146+ logger .trace (f'new_columns = { new_columns } ' )
147+ pt ['RelationalTable' ]['InputColumns' ] = dataset_columns_to_keep + new_columns
144148 all_columns += [col ['Name' ] for col in pt ['RelationalTable' ]['InputColumns' ]]
145149
146150 # Add all needed calc fields
0 commit comments