Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/200.canada.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adds the capability to include the inserted/updated/deleted records from the DataStore via the `include_records` DataDict key.
110 changes: 78 additions & 32 deletions ckanext/datastore/backend/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
records = data_dict['records']
sql_columns = ", ".join(
identifier(name) for name in field_names)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
return_columns = "_id, " + sql_columns
num = -1

if method == _INSERT:
Expand All @@ -1505,16 +1507,23 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
rows.append(row)

sql_string = '''INSERT INTO {res_id} ({columns})
VALUES ({values});'''.format(
VALUES ({values}) {return_statement};'''.format(
res_id=identifier(data_dict['resource_id']),
columns=sql_columns,
# (canada fork only): https://github.com/ckan/ckan/pull/8684
return_statement='RETURNING {return_columns}'.format(
return_columns=return_columns) if data_dict[
'include_records'] else '',
values=', '.join([
f":val_{idx}" for idx in range(0, len(field_names))
])
)

try:
context['connection'].execute(sa.text(sql_string), rows)
results = context['connection'].execute(sa.text(sql_string), rows)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if data_dict['include_records']:
data_dict['records'] = [dict(r) for r in results.mappings().all()]
except (DatabaseError, DataError) as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
Expand All @@ -1535,7 +1544,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):

elif method in [_UPDATE, _UPSERT]:
unique_keys = _get_unique_key(context, data_dict)

# (canada fork only): https://github.com/ckan/ckan/pull/8684
updated_records = {}
for num, record in enumerate(records):
if not unique_keys and '_id' not in record:
raise ValidationError({
Expand Down Expand Up @@ -1606,12 +1616,17 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
sql_string = u'''
UPDATE {res_id}
SET ({columns}, "_full_text") = ({values}, NULL)
WHERE ({primary_key}) = ({primary_value});
WHERE ({primary_key}) = ({primary_value})
{return_statement};
'''.format(
res_id=identifier(data_dict['resource_id']),
columns=u', '.join(
[identifier(field)
for field in used_field_names]),
# (canada fork only): https://github.com/ckan/ckan/pull/8684
return_statement='RETURNING {return_columns}'.format(
return_columns=return_columns) if data_dict[
'include_records'] else '',
values=u', '.join(values),
primary_key=pk_sql,
primary_value=pk_values_sql,
Expand All @@ -1620,6 +1635,10 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
results = context['connection'].execute(
sa.text(sql_string),
{**used_values, **unique_values})
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if data_dict['include_records']:
for r in results.mappings().all():
updated_records[str(r._id)] = dict(r)
except DatabaseError as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
Expand Down Expand Up @@ -1647,37 +1666,39 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
elif method == _UPSERT:
format_params = dict(
res_id=identifier(data_dict['resource_id']),
columns=u', '.join(
columns=('_id, ' if pk_sql == '"_id"' else '') + ', '.join(
[identifier(field)
for field in used_field_names]),
values=u', '.join([
f'cast(:{p} as nested)'
if field['type'] == 'nested' else ":" + p
for p, field in zip(value_placeholders, used_fields)
]),
primary_key=pk_sql,
primary_value=pk_values_sql,
# (canada fork only): https://github.com/ckan/ckan/pull/8684
set_statement=', '.join(
['{col}=EXCLUDED.{col}'.format(col=identifier(field))
for field in used_field_names]),
return_statement='RETURNING {return_columns}'.format(
return_columns=return_columns) if data_dict[
'include_records'] else '',
values=('%s, ' % pk_values_sql if pk_sql == '"_id"' else '') +
', '.join([f'cast(:{p} as nested)' if field['type'] == 'nested'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this nested type haunts the datastore code base since before postgres had a built-in json type 👻

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r we fixing it upstream eventually?

else ":" + p
for p, field in zip(value_placeholders, used_fields)]),
primary_key=pk_sql
)

update_string = """
UPDATE {res_id}
SET ({columns}, "_full_text") = ({values}, NULL)
WHERE ({primary_key}) = ({primary_value})
""".format(**format_params)

insert_string = """
INSERT INTO {res_id} ({columns})
SELECT {values}
WHERE NOT EXISTS (SELECT 1 FROM {res_id}
WHERE ({primary_key}) = ({primary_value}))
# (canada fork only): https://github.com/ckan/ckan/pull/8684
sql_string = """
INSERT INTO {res_id} ({columns}) VALUES ({values})
ON CONFLICT ({primary_key}) DO UPDATE
SET {set_statement}
{return_statement}
""".format(**format_params)

values = {**used_values, **unique_values}
try:
context['connection'].execute(
sa.text(update_string), values)
context['connection'].execute(
sa.text(insert_string), values)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
results = context['connection'].execute(
sa.text(sql_string), values)
if data_dict['include_records']:
for r in results.mappings().all():
updated_records[str(r._id)] = dict(r)
except DatabaseError as err:
# (canada fork only): parse constraint sql errors
# TODO: upstream contrib!!
Expand All @@ -1695,6 +1716,9 @@ def upsert_data(context: Context, data_dict: dict[str, Any]):
'records': [errmsg],
'records_row': num,
})
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if updated_records:
data_dict['records'] = list(updated_records.values())


def validate(context: Context, data_dict: dict[str, Any]):
Expand Down Expand Up @@ -1724,6 +1748,8 @@ def validate(context: Context, data_dict: dict[str, Any]):
data_dict_copy.pop('include_total', None)
data_dict_copy.pop('total_estimation_threshold', None)
data_dict_copy.pop('records_format', None)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
data_dict_copy.pop('include_records', None)
data_dict_copy.pop('calculate_record_count', None)

for key, values in data_dict_copy.items():
Expand Down Expand Up @@ -1959,6 +1985,11 @@ def delete_data(context: Context, data_dict: dict[str, Any]):
validate(context, data_dict)
fields_types = _get_fields_types(
context['connection'], data_dict['resource_id'])
# (canada fork only): https://github.com/ckan/ckan/pull/8684
fields = _get_fields(context['connection'], data_dict['resource_id'])
sql_columns = ", ".join(
identifier(f['id']) for f in fields)
return_columns = "_id, " + sql_columns

query_dict: dict[str, Any] = {
'where': []
Expand All @@ -1969,13 +2000,28 @@ def delete_data(context: Context, data_dict: dict[str, Any]):
fields_types, query_dict)

where_clause, where_values = _where(query_dict['where'])
sql_string = u'DELETE FROM "{0}" {1}'.format(
data_dict['resource_id'],
where_clause
)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if data_dict['include_deleted_records']:
rows_max = config.get('ckan.datastore.search.rows_max')
sql_string = '''WITH deleted AS (
DELETE FROM {0} {1} RETURNING {2}
) SELECT d.* FROM deleted as d LIMIT {3}
'''.format(
identifier(data_dict['resource_id']),
where_clause,
return_columns,
rows_max
)
else:
sql_string = u'DELETE FROM {0} {1}'.format(
identifier(data_dict['resource_id']),
where_clause)

try:
_execute_single_statement(context, sql_string, where_values)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
results =_execute_single_statement(context, sql_string, where_values)
if data_dict['include_deleted_records']:
data_dict['deleted_records'] = [dict(r) for r in results.mappings().all()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see this causing a MemoryError when deleting all the rows in a large table.

For dsaudit we only need the first N rows to populate the activity. Can we apply a LIMIT to a DELETE ... RETURNING?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have to do something like this I guess: https://stackoverflow.com/a/37043809

which would require some bigger sql statement changes which could be done. This code is for my datastore solr index plugin, so I would need to have all of the deleted records I think?

except ProgrammingError as pe:
raise ValidationError({'filters': [_programming_error_summary(pe)]})

Expand Down
25 changes: 23 additions & 2 deletions ckanext/datastore/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]):
:param records: the data, eg: [{"dob": "2005", "some_stuff": ["a", "b"]}]
(optional)
:type records: list of dictionaries
:param include_records: return the full values of inserted records
(optional, default: False)
:type include_records: bool
:param primary_key: fields that represent a unique key (optional)
:type primary_key: list or comma separated string
:param foreign_keys: tables and fields that represent foreign keys (optional)
Expand Down Expand Up @@ -184,7 +187,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]):

result.pop('id', None)
result.pop('connection_url', None)
result.pop('records', None)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if not data_dict.pop('include_records', False):
result.pop('records', None)
return result


Expand Down Expand Up @@ -251,6 +256,9 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]):
:param records: the data, eg: [{"dob": "2005", "some_stuff": ["a","b"]}]
(optional)
:type records: list of dictionaries
:param include_records: return the full values of inserted records
(optional, default: False)
:type include_records: bool
:param method: the method to use to put the data into the datastore.
Possible options are: upsert, insert, update
(optional, default: upsert)
Expand Down Expand Up @@ -299,6 +307,10 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]):
result.pop('id', None)
result.pop('connection_url', None)

# (canada fork only): https://github.com/ckan/ckan/pull/8684
if not data_dict.pop('include_records', False):
result.pop('records', None)

if data_dict.get('calculate_record_count', False):
backend.calculate_record_count(data_dict['resource_id']) # type: ignore

Expand Down Expand Up @@ -388,6 +400,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]):
If missing delete whole table and all dependent views.
(optional)
:type filters: dictionary
:param include_deleted_records: return the full values of deleted records
(optional, default: False)
:type include_deleted_records: bool
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
Expand All @@ -397,7 +412,7 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]):

**Results:**

:returns: Original filters sent.
:returns: Original filters sent and list of deleted_records
:rtype: dictionary

'''
Expand Down Expand Up @@ -455,6 +470,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]):

result.pop('id', None)
result.pop('connection_url', None)
# (canada fork only): https://github.com/ckan/ckan/pull/8684
if not data_dict.pop('include_deleted_records', False):
result.pop('deleted_records', None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for all of the other endpoints "include_records" adds a "records" value to the return but datastore_delete is different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just semantically for API users to make it clear that the records were deleted

return result


Expand All @@ -470,6 +488,9 @@ def datastore_records_delete(context: Context, data_dict: dict[str, Any]):
If {} delete all records.
(required)
:type filters: dictionary
:param include_deleted_records: return the full values of deleted records
(optional, default: False)
:type include_deleted_records: bool
:param calculate_record_count: updates the stored count of records, used to
optimize datastore_search in combination with the
`total_estimation_threshold` parameter. If doing a series of requests
Expand Down
8 changes: 8 additions & 0 deletions ckanext/datastore/logic/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def datastore_create_schema() -> Schema:
one_of([u'row'])],
'function': [not_empty, unicode_only],
},
# (canada fork only): https://github.com/ckan/ckan/pull/8684
'include_records': [default(False), boolean_validator],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
Expand All @@ -155,6 +157,8 @@ def datastore_upsert_schema() -> Schema:
'id': [ignore_missing],
'method': [ignore_missing, unicode_safe, one_of(
['upsert', 'insert', 'update'])],
# (canada fork only): https://github.com/ckan/ckan/pull/8684
'include_records': [default(False), boolean_validator],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'dry_run': [ignore_missing, boolean_validator],
Expand All @@ -169,6 +173,8 @@ def datastore_delete_schema() -> Schema:
'resource_id': [not_missing, not_empty, unicode_safe],
'force': [ignore_missing, boolean_validator],
'id': [ignore_missing],
# (canada fork only): https://github.com/ckan/ckan/pull/8684
'include_deleted_records': [default(False), boolean_validator],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
Expand All @@ -183,6 +189,8 @@ def datastore_records_delete_schema() -> Schema:
'force': [ignore_missing, boolean_validator],
'filters': [not_missing, dict_only],
'id': [ignore_missing],
# (canada fork only): https://github.com/ckan/ckan/pull/8684
'include_deleted_records': [default(False), boolean_validator],
'calculate_record_count': [ignore_missing, default(False),
boolean_validator],
'__junk': [empty],
Expand Down