Skip to content

Commit f310a09

Browse files
committed
made entity list immutable
1 parent 65607dc commit f310a09

File tree

3 files changed

+52
-33
lines changed

3 files changed

+52
-33
lines changed

python-lib/tc_etl_lib/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ La librería además proporciona [context managers](https://docs.python.org/3/re
341341
- si `table_name[entityType]` existe y es *falsy* (`None`, `""`, etc), las entidades de ese tipo no se escriben al fichero SQL.
342342
- :param: `chunk_size` opcional: máximo número de líneas a incluir en un solo `INSERT`. Default=10000
343343
- :param: `append` opcional: en caso de que el fichero exista, `append=True` añade los INSERT mientras que `append=False` sobrescribe el fichero. Default False.
344-
- :param `replace_id` opcional: reemplaza el ID de la entidad por un valor construido a partir de la lista de atributos indicados en este parámetro, separados por `_`. Imita el comportamiento del atributo `replaceId` de los flujos histórico y lastdata de URBO-DEPLOYER, para poder usar este *store* en ETLs que alimenten *singletons*.
344+
- :param `replace_id` opcional: diccionario `tipo de entidad` => `lista de atributos replace_id`.
345+
Reemplaza el ID de las entidades del tipo o tipos especificados, por un valor construido a partir de la lista de atributos indicados en este parámetro, separados por `_`.
346+
Imita el comportamiento del atributo `replaceId` de los flujos históricos de URBO-DEPLOYER, para poder usar este *store* en ETLs que alimenten *singletons*.
345347
- :return: un `callable` que recibe una lista de entidades y las escribe como instrucciones sql `INSERT` en el fichero especificado.
346348

347349
El modo de uso de cualquiera de los context managers es idéntico:

python-lib/tc_etl_lib/tc_etl_lib/store.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def send_batch(entities: Iterable[Any]):
4848
yield send_batch
4949

5050
@contextmanager
51-
def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", namespace:str="", table_names:Optional[Dict[str, str]]=None, chunk_size:int=10000, append:bool=False, replace_id:Optional[Sequence[str]]=None):
51+
def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", namespace:str="", table_names:Optional[Dict[str, str]]=None, chunk_size:int=10000, append:bool=False, replace_id:Optional[Dict[str, Sequence[str]]]=None):
5252
'''
5353
Context manager that creates a store to save entities to an SQL File.
5454
SQL syntax used is postgresql.
@@ -62,34 +62,20 @@ def sqlFileStore(path: Path, *, subservice:str, schema:str=":target_schema", nam
6262
- if table_name[entityType] exists and is empty, entities with the given type are not saved.
6363
chunk_size: maximum lines in a single insert statement. Default=10000
6464
append: append to the file instead of overwriting.
65-
replace_id: list of attributes to use to replace the entity's id before saving to the db.
66-
These attributes must exist in the entity, otherwise it will raise a KeyError.
65+
replace_id: dictionary of `entity type` => `list of replace_id attributes`.
66+
For any entity of the given type, the entity id will be replaced by
67+
the list of values of the specified attributes, concatenated with "_".
68+
This mimics the behaviour of `replaceId` param in historic URBO-DEPLOYER flows.
6769
'''
6870
mode = "a+" if append else "w+"
6971
handler = path.open(mode=mode, encoding="utf-8")
7072
some_table_names = table_names or {} # make sure it is not None
73+
replace_id = replace_id or {} # make sure it is not None
7174
try:
7275
def send_batch(entities: Iterable[Any]):
7376
"""Send a batch of entities to the database"""
74-
if replace_id is not None and len(replace_id) > 0:
75-
def replace(entities):
76-
"""
77-
Replace entities_id with concatened list of attribute values.
78-
Mimics the behaviour of replaceId parameter of historic and lastdata
79-
flows in urbo-deployer.
80-
"""
81-
for entity in entities:
82-
values: List[str] = []
83-
for attr in replace_id:
84-
if attr == 'id':
85-
values.append(entity['id'])
86-
else:
87-
values.append(str(entity[attr]['value']))
88-
entity['id'] = "_".join(values)
89-
yield entity
90-
entities = replace(entities)
9177
for chunk in iter_chunk(entities, chunk_size):
92-
handler.write(sqlfile_batch(schema=schema, namespace=namespace, table_names=some_table_names, subservice=subservice, entities=chunk))
78+
handler.write(sqlfile_batch(schema=schema, namespace=namespace, table_names=some_table_names, subservice=subservice, replace_id=replace_id, entities=chunk))
9379
handler.write("\n")
9480
yield send_batch
9581
finally:
@@ -120,15 +106,26 @@ def sql_escape(obj: Any) -> str:
120106
adaptor.encoding = 'utf-8'
121107
return adaptor.getquoted().decode('utf-8')
122108

123-
def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str]) -> str:
109+
def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str], replace_id:Optional[Sequence[str]]=None) -> str:
124110
'''
125111
Generates a string suitable for SQL insert, with all values of the entity
126112
127113
subservice: subservice name
128114
entity: ngsi entity
129-
fields: list of fields to save in the entity (omitting id, type)'''
115+
fields: list of fields to save in the entity (omitting id, type)
116+
replace_id: optional list of attributes to replace the entity's id
117+
'''
118+
entityid = entity['id']
119+
if replace_id is not None and len(replace_id) > 0:
120+
values = []
121+
for attr in replace_id:
122+
if attr == 'id':
123+
values.append(entityid)
124+
else:
125+
values.append(str(entity[attr]['value']))
126+
entityid = "_".join(values)
130127
sql = [
131-
sql_escape(entity['id']),
128+
sql_escape(entityid),
132129
sql_escape(entity['type']),
133130
sql_escape(subservice),
134131
"NOW()"
@@ -148,19 +145,20 @@ def sqlfile_values(subservice: str, entity: Dict[str, Any], fields: Iterable[str
148145
sql.append(value)
149146
return f"({','.join(sql)})"
150147

151-
def sqlfile_insert(subservice: str, table_name: str, fields: Sequence[str], entities: Iterable[Any]) -> str:
148+
def sqlfile_insert(subservice: str, table_name: str, fields: Sequence[str], entities: Iterable[Any], replace_id:Optional[Sequence[str]]=None) -> str:
152149
'''
153150
Generate SQL INSERT lines from sequence of entities
154151
155152
subservice: subservice name
156153
table_name: SQL table name to use
157154
fields: sequence of attribute names
158155
entities: iterable of entities
156+
replace_id: optional list of alues to replace the entity's id
159157
'''
160158
return "\n".join((
161159
f"INSERT INTO {table_name} (entityid,entitytype,fiwareservicepath,recvtime,{','.join(fields)}) VALUES",
162160
",\n".join(
163-
sqlfile_values(subservice, entity, fields)
161+
sqlfile_values(subservice, entity, fields, replace_id)
164162
for entity in entities
165163
) + ";"
166164
))
@@ -180,14 +178,15 @@ def sql_table_name(schema: str, namespace: str, entity_type: str, table_names: D
180178
# Tables mapped to some name are prefixed with schema name
181179
return f"{schema}.{mapped_name}"
182180

183-
def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subservice: str, entities: Iterable[Any]) -> str:
181+
def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subservice: str, replace_id: Dict[str, Sequence[str]], entities: Iterable[Any]) -> str:
184182
'''
185183
Generate a single SQL insert batch statement
186184
187185
schema: schema name
188186
namespace: namespace for table names
189187
table_names: overrides entytyType to table name defaults
190188
subservice: subservice name
189+
replace_id: map of entity type to list of replace_id attributes
191190
entities: Iterable of entities
192191
'''
193192
# Group entities by type
@@ -207,6 +206,6 @@ def sqlfile_batch(schema: str, namespace: str, table_names: Dict[str, str], subs
207206
table_name = sql_table_name(schema, namespace, entity_type, table_names)
208207
if table_name:
209208
table_cols = sorted(fields_by_type[entity_type])
210-
sql.append(sqlfile_insert(subservice, table_name, table_cols, typed_entities))
209+
sql.append(sqlfile_insert(subservice, table_name, table_cols, typed_entities, replace_id.get(entity_type, None)))
211210
# and return SQL insert code
212211
return "\n".join(sql)

python-lib/tc_etl_lib/tc_etl_lib/test_store.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,31 @@ def test_singleton_one_id(self):
232232
"type": "geo:json",
233233
"value": { "type": "Point", "coordinates": [1, 2] }
234234
}
235+
},
236+
{
237+
"id": "id_no_singleton",
238+
"type": "type_B",
239+
"ref": {
240+
"type": "Text",
241+
"value": "id_B"
242+
},
243+
"municipality": {
244+
"type": "Text",
245+
"value": "NA"
246+
},
247+
"location": {
248+
"type": "geo:json",
249+
"value": { "type": "Point", "coordinates": [1, 2] }
250+
}
235251
}
236252
]
237253
expected = """
238254
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
239255
('id_A','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_A');
256+
INSERT INTO myschema.type_b (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
257+
('id_no_singleton','type_B','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_B');
240258
"""
241-
self.do_test(expected, "", entities=entities, replace_id=['ref'], subservice="/testsrv", schema="myschema")
259+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['ref']}, subservice="/testsrv", schema="myschema")
242260

243261
def test_singleton_two_ids(self):
244262
'''Test replace_id with two attributes'''
@@ -268,10 +286,10 @@ def test_singleton_two_ids(self):
268286
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,primary,secondary) VALUES
269287
('id_primary_5','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA','id_primary',5);
270288
"""
271-
self.do_test(expected, "", entities=entities, replace_id=['primary', 'secondary'], subservice="/testsrv", schema="myschema")
289+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['primary', 'secondary']}, subservice="/testsrv", schema="myschema")
272290

273291
def test_singleton_original_id(self):
274-
'''Test replace_id with two attributes'''
292+
'''Test replace_id with the original id, plus some attrib'''
275293
entities = [
276294
{
277295
"id": "id_singleton",
@@ -294,4 +312,4 @@ def test_singleton_original_id(self):
294312
INSERT INTO myschema.type_a (entityid,entitytype,fiwareservicepath,recvtime,location,municipality,ref) VALUES
295313
('id_singleton_5','type_A','/testsrv',NOW(),ST_GeomFromGeoJSON('{"type": "Point", "coordinates": [1, 2]}'),'NA',5);
296314
"""
297-
self.do_test(expected, "", entities=entities, replace_id=['id', 'ref'], subservice="/testsrv", schema="myschema")
315+
self.do_test(expected, "", entities=entities, replace_id={'type_A':['id', 'ref']}, subservice="/testsrv", schema="myschema")

0 commit comments

Comments
 (0)