Skip to content

feat: add support for AddToCell in Data Client #1147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions google/cloud/bigtable/data/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation:
instance = DeleteAllFromFamily(details["family_name"])
elif "delete_from_row" in input_dict:
instance = DeleteAllFromRow()
elif "add_to_cell" in input_dict:
details = input_dict["add_to_cell"]
instance = AddToCell(
details["family_name"],
details["column_qualifier"]["raw_value"],
details["input"]["int_value"],
details["timestamp"]["raw_timestamp_micros"],
)
except KeyError as e:
raise ValueError("Invalid mutation dictionary") from e
if instance is None:
Expand Down Expand Up @@ -276,6 +284,75 @@ def _to_dict(self) -> dict[str, Any]:
}


@dataclass
class AddToCell(Mutation):
"""
Adds an int64 value to an aggregate cell. The column family must be an
aggregate family and have an "int64" input type or this mutation will be
rejected.

Note: The timestamp values are in microseconds but must match the
granularity of the table (defaults to `MILLIS`). Therefore, the given value
must be a multiple of 1000 (millisecond granularity). For example:
`1571902339435000`.

Args:
family: The name of the column family to which the cell belongs.
qualifier: The column qualifier of the cell.
value: The value to be accumulated into the cell.
timestamp_micros: The timestamp of the cell. Must be provided for
cell aggregation to work correctly.


Raises:
TypeError: If `qualifier` is not `bytes` or `str`.
TypeError: If `value` is not `int`.
TypeError: If `timestamp_micros` is not `int`.
ValueError: If `value` is out of bounds for a 64-bit signed int.
ValueError: If `timestamp_micros` is less than 0.
"""

def __init__(
self,
family: str,
qualifier: bytes | str,
value: int,
timestamp_micros: int,
):
qualifier = qualifier.encode() if isinstance(qualifier, str) else qualifier
if not isinstance(qualifier, bytes):
raise TypeError("qualifier must be bytes or str")
if not isinstance(value, int):
raise TypeError("value must be int")
if not isinstance(timestamp_micros, int):
raise TypeError("timestamp_micros must be int")
if abs(value) > _MAX_INCREMENT_VALUE:
raise ValueError(
"int values must be between -2**63 and 2**63 (64-bit signed int)"
)

if timestamp_micros < 0:
raise ValueError("timestamp must be non-negative")

self.family = family
self.qualifier = qualifier
self.value = value
self.timestamp = timestamp_micros

def _to_dict(self) -> dict[str, Any]:
return {
"add_to_cell": {
"family_name": self.family,
"column_qualifier": {"raw_value": self.qualifier},
"timestamp": {"raw_timestamp_micros": self.timestamp},
"input": {"int_value": self.value},
}
}

def is_idempotent(self) -> bool:
return False


class RowMutationEntry:
"""
A single entry in a `MutateRows` request.
Expand Down
1 change: 1 addition & 0 deletions tests/system/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

TEST_FAMILY = "test-family"
TEST_FAMILY_2 = "test-family-2"
TEST_AGGREGATE_FAMILY = "test-aggregate-family"
3 changes: 2 additions & 1 deletion tests/system/data/setup_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
import uuid

from . import TEST_FAMILY, TEST_FAMILY_2
from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY

# authorized view subset to allow all qualifiers
ALLOW_ALL = ""
Expand Down Expand Up @@ -183,6 +183,7 @@ def authorized_view_id(
"family_subsets": {
TEST_FAMILY: ALL_QUALIFIERS,
TEST_FAMILY_2: ALL_QUALIFIERS,
TEST_AGGREGATE_FAMILY: ALL_QUALIFIERS,
},
},
},
Expand Down
84 changes: 78 additions & 6 deletions tests/system/data/test_system_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from google.cloud.bigtable.data._cross_sync import CrossSync

from . import TEST_FAMILY, TEST_FAMILY_2
from . import TEST_FAMILY, TEST_FAMILY_2, TEST_AGGREGATE_FAMILY


__CROSS_SYNC_OUTPUT__ = "tests.system.data.test_system_autogen"
Expand Down Expand Up @@ -76,6 +76,27 @@ async def add_row(
await self.target.client._gapic_client.mutate_row(request)
self.rows.append(row_key)

@CrossSync.convert
async def add_aggregate_row(
self, row_key, *, family=TEST_AGGREGATE_FAMILY, qualifier=b"q", input=0
):
request = {
"table_name": self.target.table_name,
"row_key": row_key,
"mutations": [
{
"add_to_cell": {
"family_name": family,
"column_qualifier": {"raw_value": qualifier},
"timestamp": {"raw_timestamp_micros": 0},
"input": {"int_value": input},
}
}
],
}
await self.target.client._gapic_client.mutate_row(request)
self.rows.append(row_key)

@CrossSync.convert
async def delete_rows(self):
if self.rows:
Expand Down Expand Up @@ -132,7 +153,17 @@ def column_family_config(self):
"""
from google.cloud.bigtable_admin_v2 import types

return {TEST_FAMILY: types.ColumnFamily(), TEST_FAMILY_2: types.ColumnFamily()}
int_aggregate_type = types.Type.Aggregate(
input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}),
sum={},
)
return {
TEST_FAMILY: types.ColumnFamily(),
TEST_FAMILY_2: types.ColumnFamily(),
TEST_AGGREGATE_FAMILY: types.ColumnFamily(
value_type=types.Type(aggregate_type=int_aggregate_type)
),
}

@pytest.fixture(scope="session")
def init_table_id(self):
Expand Down Expand Up @@ -281,6 +312,37 @@ async def test_mutation_set_cell(self, target, temp_rows):
# ensure cell is updated
assert (await self._retrieve_cell_value(target, row_key)) == new_value

@CrossSync.pytest
@pytest.mark.usefixtures("target")
@CrossSync.Retry(
predicate=retry.if_exception_type(ClientError), initial=1, maximum=5
)
async def test_mutation_add_to_cell(self, target, temp_rows):
"""
Test add to cell mutation
"""
from google.cloud.bigtable.data.mutations import AddToCell

row_key = b"add_to_cell"
family = TEST_AGGREGATE_FAMILY
qualifier = b"test-qualifier"
# add row to temp_rows, for future deletion
await temp_rows.add_aggregate_row(row_key, family=family, qualifier=qualifier)
# set and check cell value
await target.mutate_row(
row_key, AddToCell(family, qualifier, 1, timestamp_micros=0)
)
encoded_result = await self._retrieve_cell_value(target, row_key)
int_result = int.from_bytes(encoded_result, byteorder="big")
assert int_result == 1
# update again
await target.mutate_row(
row_key, AddToCell(family, qualifier, 9, timestamp_micros=0)
)
encoded_result = await self._retrieve_cell_value(target, row_key)
int_result = int.from_bytes(encoded_result, byteorder="big")
assert int_result == 10

@pytest.mark.skipif(
bool(os.environ.get(BIGTABLE_EMULATOR)), reason="emulator doesn't use splits"
)
Expand Down Expand Up @@ -1123,7 +1185,7 @@ async def test_execute_query_simple(self, client, table_id, instance_id):
predicate=retry.if_exception_type(ClientError), initial=1, maximum=5
)
async def test_execute_against_target(
self, client, instance_id, table_id, temp_rows
self, client, instance_id, table_id, temp_rows, column_family_config
):
await temp_rows.add_row(b"row_key_1")
result = await client.execute_query(
Expand All @@ -1138,14 +1200,19 @@ async def test_execute_against_target(
assert family_map[b"q"] == b"test-value"
assert len(rows[0][TEST_FAMILY_2]) == 0
md = result.metadata
assert len(md) == 3
# we expect it to fetch each column family, plus _key
# add additional families here if column_family_config changes
assert len(md) == len(column_family_config) + 1
assert md["_key"].column_type == SqlType.Bytes()
assert md[TEST_FAMILY].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Bytes()
)
assert md[TEST_FAMILY_2].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Bytes()
)
assert md[TEST_AGGREGATE_FAMILY].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Int64()
)

@pytest.mark.skipif(
bool(os.environ.get(BIGTABLE_EMULATOR)),
Expand Down Expand Up @@ -1248,7 +1315,7 @@ async def test_execute_query_params(self, client, table_id, instance_id):
predicate=retry.if_exception_type(ClientError), initial=1, maximum=5
)
async def test_execute_metadata_on_empty_response(
self, client, instance_id, table_id, temp_rows
self, client, instance_id, table_id, temp_rows, column_family_config
):
await temp_rows.add_row(b"row_key_1")
result = await client.execute_query(
Expand All @@ -1258,11 +1325,16 @@ async def test_execute_metadata_on_empty_response(

assert len(rows) == 0
md = result.metadata
assert len(md) == 3
# we expect it to fetch each column family, plus _key
# add additional families here if column_family_config change
assert len(md) == len(column_family_config) + 1
assert md["_key"].column_type == SqlType.Bytes()
assert md[TEST_FAMILY].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Bytes()
)
assert md[TEST_FAMILY_2].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Bytes()
)
assert md[TEST_AGGREGATE_FAMILY].column_type == SqlType.Map(
SqlType.Bytes(), SqlType.Int64()
)
Loading
Loading