Skip to content

Commit 7f9e8ab

Browse files
authored
Merge pull request #58 from telefonicasc/fix/sql_replaceid
added replace_id attrib to sqlFileStore
2 parents 5755ac7 + c8a6a95 commit 7f9e8ab

File tree

3 files changed

+136
-12
lines changed

3 files changed

+136
-12
lines changed

python-lib/tc_etl_lib/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,9 @@ La librería además proporciona [context managers](https://docs.python.org/3/re
341341
- si `table_name[entityType]` existe y es *falsy* (`None`, `""`, etc), las entidades de ese tipo no se escriben al fichero SQL.
342342
- :param: `chunk_size` opcional: máximo número de líneas a incluir en un solo `INSERT`. Default=10000
343343
- :param: `append` opcional: en caso de que el fichero exista, `append=True` añade los INSERT mientras que `append=False` sobrescribe el fichero. Default False.
344+
- :param `replace_id` opcional: diccionario `tipo de entidad` => `lista de atributos replace_id`.
345+
Reemplaza el ID de las entidades del tipo o tipos especificados, por un valor construido a partir de la lista de atributos indicados en este parámetro, separados por `_`.
346+
Imita el comportamiento del atributo `replaceId` de los FLOW_HISTORIC de URBO DEPLOYER, para poder usar este *store* en ETLs que alimenten *singletons*.
344347
- :return: un `callable` que recibe una lista de entidades y las escribe como instrucciones sql `INSERT` en el fichero especificado.
345348

346349
El modo de uso de cualquiera de los context managers es idéntico:
@@ -439,6 +442,8 @@ TOTAL 403 221 45%
439442

440443
## Changelog
441444

445+
- Add: new optional parameter called `replace_id` in sqlFileStore context manager ([#58](https://github.com/telefonicasc/etl-framework/pull/58))
446+
442447
0.7.0 (December 233rd, 2022)
443448

444449
- Add: new stores for saving entity batches, `orionStore` and `sqlFileStore` ([#46](https://github.com/telefonicasc/etl-framework/pull/46))

python-lib/tc_etl_lib/tc_etl_lib/store.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
- orionStore: saves batches to Orion environment
2424
- sqlFileStore: saves batches to SQL File
2525
'''
26-
from typing import Callable, Dict, Any, Iterable, Sequence, Optional
26+
from typing import Callable, Dict, Any, Iterable, Sequence, List, Optional
2727
from contextlib import contextmanager
2828
from pathlib import Path
2929
from collections import defaultdict
@@ -48,7 +48,7 @@ def send_batch(entities: Iterable[Any]):
4848
yield send_batch
4949

5050
@contextmanager
51-
def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", namespace:str="", table_names:Optional[Dict[str, str]]=None, chunk_size:int=10000, append:bool=False):
51+
def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", namespace:str="", table_names:Optional[Dict[str, str]]=None, chunk_size:int=10000, append:bool=False, replace_id:Optional[Dict[str, Sequence[str]]]=None):
5252
'''
5353
Context manager that creates a store to save entities to an SQL File.
5454
SQL syntax used is postgresql.
@@ -62,14 +62,20 @@ def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", nam
6262
- if table_name[entityType] exists and is empty, entities with the given type are not saved.
6363
chunk_size: maximum lines in a single insert statement. Default=10000
6464
append: append to the file instead of overwriting.
65+
replace_id: dictionary of `entity type` => `list of replace_id attributes`.
66+
For any entity of the given type, the entity id will be replaced by
67+
the list of values of the specified attributes, concatenated with "_".
68+
This mimics the behaviour of `replaceId` param in historic URBO-DEPLOYER flows.
6569
'''
6670
mode = "a+" if append else "w+"
6771
handler = path.open(mode=mode, encoding="utf-8")
6872
some_table_names = table_names or {} # make sure it is not None
73+
replace_id = replace_id or {} # make sure it is not None
6974
try:
7075
def send_batch(entities: Iterable[Any]):
76+
"""Send a batch of entities to the database"""
7177
for chunk in iter_chunk(entities, chunk_size):
72-
handler.write(sqlfile_batch(schema=schema, namespace=namespace, table_names=some_table_names, subservice=subservice, entities=chunk))
78+
handler.write(sqlfile_batch(schema=schema, namespace=namespace, table_names=some_table_names, subservice=subservice, replace_id=replace_id, entities=chunk))
7379
handler.write("\n")
7480
yield send_batch
7581
finally:
@@ -100,15 +106,26 @@ def sql_escape(obj: Any) -> str:
100106
adaptor.encoding = 'utf-8'
101107
return adaptor.getquoted().decode('utf-8')
102108

103-
def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str]) -> str:
109+
def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str], replace_id:Optional[Sequence[str]]=None) -> str:
104110
'''
105111
Generates a string suitable for SQL insert, with all values of the entity
106112
107113
subservice: subservice name
108114
entity: ngsi entity
109-
fields: list of fields to save in the entity (omitting id, type)'''
115+
fields: list of fields to save in the entity (omitting id, type)
116+
replace_id: optional list of attributes to replace the entity's id
117+
'''
118+
entityid = entity['id']
119+
if replace_id is not None and len(replace_id) > 0:
120+
values = []
121+
for attr in replace_id:
122+
if attr == 'id':
123+
values.append(entityid)
124+
else:
125+
values.append(str(entity[attr]['value']))
126+
entityid = "_".join(values)
110127
sql = [
111-
sql_escape(entity['id']),
128+
sql_escape(entityid),
112129
sql_escape(entity['type']),
113130
sql_escape(subservice),
114131
"NOW()"
@@ -128,19 +145,20 @@ def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str
128145
sql.append(value)
129146
return f"({','.join(sql)})"
130147

131-
def sqlfile_insert(subservice: str, table_name: str, fields: Sequence[str], entities: Iterable[Any]) -> str:
148+
def sqlfile_insert(subservice: str, table_name: str, fields: Sequence[str], entities: Iterable[Any], replace_id:Optional[Sequence[str]]=None) -> str:
132149
'''
133150
Generate SQL INSERT lines from sequence of entities
134151
135152
subservice: subservice name
136153
table_name: SQL table name to use
137154
fields: sequence of attribute names
138155
entities: iterable of entities
156+
replace_id: optional list of alues to replace the entity's id
139157
'''
140158
return "\n".join((
141159
f"INSERT INTO {table_name} (entityid,entitytype,fiwareservicepath,recvtime,{','.join(fields)}) VALUES",
142160
",\n".join(
143-
sqlfile_values(subservice, entity, fields)
161+
sqlfile_values(subservice, entity, fields, replace_id)
144162
for entity in entities
145163
) + ";"
146164
))
@@ -160,14 +178,15 @@ def sql_table_name(schema: str, namespace: str, entity_type: str, table_names: D
160178
# Tables mapped to some name are prefixed with schema name
161179
return f"{schema}.{mapped_name}"
162180

163-
def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subservice: str, entities: Iterable[Any]) -> str:
181+
def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subservice: str, replace_id: Dict[str, Sequence[str]], entities: Iterable[Any]) -> str:
164182
'''
165183
Generate a single SQL insert batch statement
166184
167185
schema: schema name
168186
namespace: namespace for table names
169187
table_names: overrides entytyType to table name defaults
170188
subservice: subservice name
189+
replace_id: map of entity type to list of replace_id attributes
171190
entities: Iterable of entities
172191
'''
173192
# Group entities by type
@@ -187,6 +206,6 @@ def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subs
187206
table_name = sql_table_name(schema, namespace, entity_type, table_names)
188207
if table_name:
189208
table_cols = sorted(fields_by_type[entity_type])
190-
sql.append(sqlfile_insert(subservice, table_name, table_cols, typed_entities))
209+
sql.append(sqlfile_insert(subservice, table_name, table_cols, typed_entities, replace_id.get(entity_type, None)))
191210
# and return SQL insert code
192211
return "\n".join(sql)

python-lib/tc_etl_lib/tc_etl_lib/test_store.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def dedent(text: str) -> str:
100100
class TestSQLFileStore(unittest.TestCase):
101101
'''Tests for sqlFileStore'''
102102

103-
def do_test(self, expect: str, append_text: str, **params):
103+
def do_test(self, expect: str, append_text: str, entities=TestEntities, **params):
104104
'''test sqlFileStore with given parameters and expectations'''
105105
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8', delete=False) as tmpFile:
106106
try:
@@ -109,7 +109,7 @@ def do_test(self, expect: str, append_text: str, **params):
109109
with open(tmpFile.name, "w+", encoding="utf-8") as outfile:
110110
outfile.write(dedent(append_text))
111111
with sqlFileStore(Path(tmpFile.name), **params) as store:
112-
store(TestEntities)
112+
store(entities)
113113
with open(tmpFile.name, "r", encoding="utf-8") as infile:
114114
data = infile.read()
115115
self.maxDiff = None
@@ -213,3 +213,103 @@ def test_append(self):
213213
('id_4','type_B','/testsrv',NOW(),'2022-12-15T18:01:00Z',21);
214214
"""
215215
self.do_test(expected, create, subservice="/testsrv", schema="myschema", chunk_size=3, append=True)
216+
217+
def test_singleton_one_id(self):
218+
'''Test replace_id with a single attribute'''
219+
entities = [
220+
{
221+
"id": "id_singleton",
222+
"type": "type_A",
223+
"ref": {
224+
"type": "Text",
225+
"value": "id_A"
226+
},
227+
"municipality": {
228+
"type": "Text",
229+
"value": "NA"
230+
},
231+
"location": {
232+
"type": "geo:json",
233+
"value": { "type": "Point", "coordinates": [1, 2] }
234+
}
235+
},
236+
{
237+
"id": "id_no_singleton",
238+
"type": "type_B",
239+
"ref": {
240+
"type": "Text",
241+
"value": "id_B"
242+
},
243+
"municipality": {
244+
"type": "Text",
245+
"value": "NA"
246+
},
247+
"location": {
248+
"type": "geo:json",
249+
"value": { "type": "Point", "coordinates": [1, 2] }
250+
}
251+
}
252+
]
253+
expected = """
254+
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
255+
('id_A','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_A');
256+
INSERT INTO myschema.type_b (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
257+
('id_no_singleton','type_B','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_B');
258+
"""
259+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['ref']}, subservice="/testsrv", schema="myschema")
260+
261+
def test_singleton_two_ids(self):
262+
'''Test replace_id with two attributes'''
263+
entities = [
264+
{
265+
"id": "id_singleton",
266+
"type": "type_A",
267+
"primary": {
268+
"type": "Text",
269+
"value": "id_primary"
270+
},
271+
"secondary": {
272+
"type": "Number",
273+
"value": 5
274+
},
275+
"municipality": {
276+
"type": "Text",
277+
"value": "NA"
278+
},
279+
"location": {
280+
"type": "geo:json",
281+
"value": { "type": "Point", "coordinates": [1, 2] }
282+
}
283+
}
284+
]
285+
expected = """
286+
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,primary,secondary) VALUES
287+
('id_primary_5','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_primary',5);
288+
"""
289+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['primary', 'secondary']}, subservice="/testsrv", schema="myschema")
290+
291+
def test_singleton_original_id(self):
292+
'''Test replace_id with the original id, plus some attrib'''
293+
entities = [
294+
{
295+
"id": "id_singleton",
296+
"type": "type_A",
297+
"ref": {
298+
"type": "Number",
299+
"value": 5
300+
},
301+
"municipality": {
302+
"type": "Text",
303+
"value": "NA"
304+
},
305+
"location": {
306+
"type": "geo:json",
307+
"value": { "type": "Point", "coordinates": [1, 2] }
308+
}
309+
}
310+
]
311+
expected = """
312+
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
313+
('id_singleton_5','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA',5);
314+
"""
315+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['id', 'ref']}, subservice="/testsrv", schema="myschema")

0 commit comments

Comments
 (0)