@@ -107,15 +107,23 @@ def _athena_to_qs_type(col, athena_type):
107107 # take the first one and let's hope it is fine
108108 root_lt = next (iter (dataset ['LogicalTableMap' ].values ()))
109109
110- projected_cols = next ( # get the first DataTrasform with ProjectOperation
110+ renames = {}
111+ for lt in dataset ['LogicalTableMap' ].values ():
112+ for dt in lt .get ('DataTransforms' , []):
113+ if "RenameColumnOperation" in dt :
114+ key = lt ['Source' ].get ('PhysicalTableId' , '' ) + '.' + dt ["RenameColumnOperation" ]['ColumnName' ]
115+ renames [key ] = dt ["RenameColumnOperation" ]['NewColumnName' ]
116+ logger .trace (f'renames = { renames } ' )
117+
118+ projected_cols = next ( # get the first DataTransform with ProjectOperation
111119 ds ['ProjectOperation' ]["ProjectedColumns" ]
112120 for ds in root_lt ['DataTransforms' ]
113121 if 'ProjectOperation' in ds
114122 )
115123
116124 # Update each PhysicalTableMap with all columns from athena views
117125 all_columns = []
118- for pt in dataset ['PhysicalTableMap' ].values ():
126+ for pt_id , pt in dataset ['PhysicalTableMap' ].items ():
119127 table_name = pt ['RelationalTable' ]['Name' ]
120128 database = pt ['RelationalTable' ]['Schema' ]
121129 columns = _get_athena_columns (table_name , database )
@@ -137,15 +145,20 @@ def _athena_to_qs_type(col, athena_type):
137145 if col ['Name' ].lower () not in dataset_columns_names
138146 ] # BTW what if col is there but another type?
139147
140- for col in new_columns : # alter names for columns that already exist
141- if col ['Name' ].lower () in projected_cols :
142- col ['Name' ] = f" { col [ 'Name' ] } [ { table_name } ]" # What if it is alrady there?
148+ # FIXME: need to add RenameColumnOperation! col['Name'] => f"{col['Name']}[{table_name}]"
149+ new_columns = [ col for col in new_columns if col ['Name' ].lower () not in projected_cols ] # avoid things that are already there (probably need to take renames into account)
150+ new_columns = [ col for col in new_columns if col ['Name' ] not in all_columns ] # avoid adding 2nd time
143151
144152 logger .trace (f'dataset_columns_to_keep = { dataset_columns_to_keep } ' )
145153 if new_columns :
146- logger .trace (f'new_columns = { new_columns } ' )
154+ logger .trace (f'new_columns = { new_columns } from { pt_id } ' )
147155 pt ['RelationalTable' ]['InputColumns' ] = dataset_columns_to_keep + new_columns
148- all_columns += [col ['Name' ] for col in pt ['RelationalTable' ]['InputColumns' ]]
156+
157+ for col in new_columns :
158+ col_name = col ['Name' ]
159+ if f'{ pt_id } .{ col_name } ' in renames :
160+ col_name = renames [f'{ pt_id } .{ col_name } ' ]
161+ all_columns .append (col_name )
149162
150163 # Add all needed calc fields
151164 existing_create_columns = [dt .get ("CreateColumnsOperation" , {}).get ('Columns' , [None ])[0 ] for dt in root_lt .get ('DataTransforms' , []) if dt .get ("CreateColumnsOperation" )]
@@ -172,12 +185,12 @@ def _athena_to_qs_type(col, athena_type):
172185 # Add all new cols to projected columns
173186 for col in set (all_columns ):
174187 if col .lower () not in [c .lower () for c in projected_cols ]:
188+ logger .trace (f'adding { col } ' )
175189 projected_cols .append (col )
176190
177191 # filter out all columns that cannot be used for dataset creation
178192 update_ = {key : value for key , value in dataset .items () if key in 'DataSetId, Name, PhysicalTableMap, LogicalTableMap, ImportMode, ColumnGroups, FieldFolders, RowLevelPermissionDataSet, RowLevelPermissionTagConfiguration, ColumnLevelPermissionRules, DataSetUsageConfiguration, DatasetParameters' .split (', ' )}
179193 logger .trace (f'update_ = { update_ } ' )
180-
181194 return update_
182195
183196
0 commit comments