diff --git a/src/AtomicIncrementRequest.java b/src/AtomicIncrementRequest.java index e556a950..96ff425d 100644 --- a/src/AtomicIncrementRequest.java +++ b/src/AtomicIncrementRequest.java @@ -173,6 +173,11 @@ public byte[] family() { return family; } + @Override + public byte[][] getFamilies() { + return new byte[][] { family }; + } + @Override public byte[] qualifier() { return qualifier; diff --git a/src/BatchableRpc.java b/src/BatchableRpc.java index 64157a91..6bafe8fb 100644 --- a/src/BatchableRpc.java +++ b/src/BatchableRpc.java @@ -44,8 +44,8 @@ abstract class BatchableRpc extends HBaseRpc // So instead we make them package-private so that subclasses can still // access them directly. - /** Family affected by this RPC. */ - /*protected*/ final byte[] family; + /** Families affected by this RPC. */ + /*protected*/ final byte[][] families; /** The timestamp to use for {@link KeyValue}s of this RPC. */ /*protected*/ final long timestamp; @@ -72,16 +72,16 @@ abstract class BatchableRpc extends HBaseRpc * Package private constructor. * @param table The name of the table this RPC is for. * @param row The name of the row this RPC is for. - * @param family The column family to edit in that table. Subclass must - * validate, this class doesn't perform any validation on the family. + * @param families The column families to edit in that table. Subclass must + * validate, this class doesn't perform any validation on the families. * @param timestamp The timestamp to use for {@link KeyValue}s of this RPC. * @param lockid Explicit row lock to use, or {@link RowLock#NO_LOCK}. */ BatchableRpc(final byte[] table, - final byte[] key, final byte[] family, + final byte[] key, final byte[][] families, final long timestamp, final long lockid) { super(table, key); - this.family = family; + this.families = families; this.timestamp = timestamp; this.lockid = lockid; } @@ -116,7 +116,12 @@ public final void setDurable(final boolean durable) { @Override public final byte[] family() { - return family; + return families == null ? null : families[0]; + } + + @Override + public final byte[][] getFamilies() { + return families; } @Override @@ -156,18 +161,34 @@ final boolean canBuffer() { /** * How many {@link KeyValue}s will be serialized by {@link #serializePayload}. + * Used with RPCs with single column families. */ abstract int numKeyValues(); /** * An estimate of the number of bytes needed for {@link #serializePayload}. * The estimate is conservative. + * Used with RPCs with single column families. */ abstract int payloadSize(); /** * Serialize the part of this RPC for a {@link MultiAction}. + * Used with RPCs with single column families. */ abstract void serializePayload(final ChannelBuffer buf); + /** + * An estimate of the number of bytes needed for {@link #serializePayloads}. + * The estimate is conservative. + * Used with RPCs with multiple column families. + */ + abstract int payloadsSize(); + + /** + * Serialize the part of this RPC for a {@link MultiAction}. + * Used with RPCs with multiple column families. + */ + abstract void serializePayloads(final ChannelBuffer buf); + } diff --git a/src/CompareAndSetRequest.java b/src/CompareAndSetRequest.java index def883a3..3e599f60 100644 --- a/src/CompareAndSetRequest.java +++ b/src/CompareAndSetRequest.java @@ -102,6 +102,11 @@ public byte[] family() { return put.family(); } + @Override + public byte[][] getFamilies() { + return put.getFamilies(); + } + @Override public byte[] qualifier() { return put.qualifier(); diff --git a/src/DeleteRequest.java b/src/DeleteRequest.java index 5db050f8..488f5ef8 100644 --- a/src/DeleteRequest.java +++ b/src/DeleteRequest.java @@ -63,10 +63,12 @@ public final class DeleteRequest extends BatchableRpc private static final byte[][] DELETE_FAMILY_MARKER = new byte[][] { HBaseClient.EMPTY_ARRAY }; - /** Special value for {@link #family} when deleting a whole row. */ - static final byte[] WHOLE_ROW = new byte[0]; + /** Special value for {@link #families} when deleting a whole row. */ + static final byte[][] WHOLE_ROW = + new byte[][] { HBaseClient.EMPTY_ARRAY }; - private final byte[][] qualifiers; + private final byte[][][] qualifiers; + private final long[][] timestamps; /** Whether to delete the value only at the specified timestamp. */ private boolean at_timestamp_only = false; @@ -79,7 +81,8 @@ public final class DeleteRequest extends BatchableRpc * @throws IllegalArgumentException if any argument is malformed. */ public DeleteRequest(final byte[] table, final byte[] key) { - this(table, key, null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, null, null, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -93,7 +96,7 @@ public DeleteRequest(final byte[] table, final byte[] key) { */ public DeleteRequest(final byte[] table, final byte[] key, final long timestamp) { - this(table, key, null, null, timestamp, RowLock.NO_LOCK); + this(table, key, null, null, null, timestamp, RowLock.NO_LOCK); } /** @@ -108,7 +111,24 @@ public DeleteRequest(final byte[] table, final byte[] key, public DeleteRequest(final byte[] table, final byte[] key, final byte[] family) { - this(table, key, family, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, null, + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific set of families. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families) { + this(table, key, families, null, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -125,7 +145,8 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final long timestamp) { - this(table, key, family, null, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, null, + null, timestamp, RowLock.NO_LOCK); } /** @@ -142,9 +163,9 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -164,9 +185,9 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[] qualifier, final long timestamp) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, - timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, + null, timestamp, RowLock.NO_LOCK); } /** @@ -183,10 +204,94 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final byte[][] qualifiers) { - this(table, key, family, qualifiers, + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param family The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[] family, + final byte[][] qualifiers, + final long[] timestamps) { + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, new long[][] {timestamps}, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers) { + this(table, key, families, qualifiers, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamps The corresponding timestamps to use in delete. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps) { + this(table, key, families, qualifiers, timestamps, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamps The corresponding timestamps to use in delete. + * @param timestamp The row timestamp to set on this edit. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps, + final long timestamp) { + this(table, key, families, qualifiers, timestamps, + timestamp, RowLock.NO_LOCK); + } + /** * Constructor to delete a specific number of cells in a row. * These byte arrays will NOT be copied. @@ -203,7 +308,29 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final long timestamp) { - this(table, key, family, qualifiers, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, null, + timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamp The timestamp to set on this edit. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.2 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long timestamp) { + this(table, key, families, qualifiers, + null, timestamp, RowLock.NO_LOCK); } /** @@ -221,9 +348,9 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[] qualifier, final RowLock lock) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, - KeyValue.TIMESTAMP_NOW, lock.id()); + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, + null, KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -244,9 +371,9 @@ public DeleteRequest(final byte[] table, final byte[] qualifier, final long timestamp, final RowLock lock) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, - timestamp, lock.id()); + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, + null, timestamp, lock.id()); } /** @@ -266,7 +393,30 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final RowLock lock) { - this(table, key, family, qualifiers, KeyValue.TIMESTAMP_NOW, lock.id()); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, null, + KeyValue.TIMESTAMP_NOW, lock.id()); + } + + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * Can be {@code null}. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final RowLock lock) { + this(table, key, families, qualifiers, null, + KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -288,7 +438,30 @@ public DeleteRequest(final byte[] table, final byte[][] qualifiers, final long timestamp, final RowLock lock) { - this(table, key, family, qualifiers, timestamp, lock.id()); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + null, timestamp, lock.id()); + } + + /** + * Constructor to delete a specific number of cells in a row with a row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * Can be {@code null}. + * @param timestamp The timestamp to set on this edit. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.2 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long timestamp, + final RowLock lock) { + this(table, key, families, qualifiers, null, timestamp, lock.id()); } /** @@ -299,7 +472,7 @@ public DeleteRequest(final byte[] table, */ public DeleteRequest(final String table, final String key) { this(table.getBytes(), key.getBytes(), null, null, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -313,8 +486,8 @@ public DeleteRequest(final String table, final String key) { public DeleteRequest(final String table, final String key, final String family) { - this(table.getBytes(), key.getBytes(), family.getBytes(), null, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -330,9 +503,9 @@ public DeleteRequest(final String table, final String key, final String family, final String qualifier) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -350,9 +523,9 @@ public DeleteRequest(final String table, final String family, final String qualifier, final RowLock lock) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, - KeyValue.TIMESTAMP_NOW, lock.id()); + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, + null, KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -364,8 +537,9 @@ public DeleteRequest(final String table, * @since 1.2 */ public DeleteRequest(final byte[] table, final KeyValue kv) { - this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() }, - kv.timestamp(), RowLock.NO_LOCK); + this(table, kv.key(), new byte[][] { kv.family() }, + new byte[][][] { { kv.qualifier() } }, + null, kv.timestamp(), RowLock.NO_LOCK); } /** @@ -380,37 +554,71 @@ public DeleteRequest(final byte[] table, final KeyValue kv) { public DeleteRequest(final byte[] table, final KeyValue kv, final RowLock lock) { - this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() }, - kv.timestamp(), lock.id()); + this(table, kv.key(), new byte[][] { kv.family() }, + new byte[][][] { { kv.qualifier() } }, + null, kv.timestamp(), lock.id()); } /** Private constructor. */ private DeleteRequest(final byte[] table, final byte[] key, - final byte[] family, - final byte[][] qualifiers, - final long timestamp, + final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps, + final long row_timestamp, final long lockid) { - super(table, key, family == null ? WHOLE_ROW : family, timestamp, lockid); - if (family != null) { - KeyValue.checkFamily(family); + super(table, key, families == null ? WHOLE_ROW : families, + row_timestamp, lockid); + checkParams(families, qualifiers, timestamps); + this.qualifiers = qualifiers; + this.timestamps = timestamps; + } + + private void checkParams(final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps) { + if (families != null) { + for (byte[] family : families) { + KeyValue.checkFamily(family); + } } if (qualifiers != null) { - if (family == null) { + if (families == null) { throw new IllegalArgumentException("You can't delete specific qualifiers" + " without specifying which family they belong to." - + " table=" + Bytes.pretty(table) + + " table=" + Bytes.pretty(table) + ", key=" + Bytes.pretty(key)); + } else if (families.length != qualifiers.length) { + throw new IllegalArgumentException("Length of the qualifier array does" + + " not match that of the family." + + " table=" + Bytes.pretty(table) + + ", key=" + Bytes.pretty(key)); + } else if (timestamps != null && families.length != timestamps.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and timestamps(%d) array size.", + families.length, timestamps.length)); } - for (final byte[] qualifier : qualifiers) { - KeyValue.checkQualifier(qualifier); + + for (int idx = 0; idx < families.length; idx++) { + if (qualifiers[idx] == null) { + continue; + } + if (timestamps != null) { + if (qualifiers[idx].length != timestamps[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + timestamps[idx].length + " timestamps for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } + } + for (final byte[] qualifier : qualifiers[idx]) { + KeyValue.checkQualifier(qualifier); + } } - this.qualifiers = qualifiers; - } else { - // No specific qualifier to delete: delete the entire family. Not that - // if `family == null', we'll delete the whole row anyway. - this.qualifiers = DELETE_FAMILY_MARKER; + } else if (timestamps != null) { + throw new IllegalArgumentException("Timestamps have been specified " + + "without specifying qualifiers."); } } @@ -452,11 +660,19 @@ public byte[] key() { @Override public byte[][] qualifiers() { + return qualifiers == null ? null : qualifiers[0]; + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][][] getQualifiers() { return qualifiers; } public String toString() { - return super.toStringWithQualifiers("DeleteRequest", family, qualifiers); + return super.toStringWithQualifiers("DeleteRequest", families, qualifiers); } // ---------------------- // @@ -479,25 +695,39 @@ byte code() { @Override int numKeyValues() { - return qualifiers.length; + return (qualifiers != null && qualifiers[0] != null) + ? qualifiers[0].length + : 1; } @Override void serializePayload(final ChannelBuffer buf) { - if (family == null) { + serializePayload(buf, 0); + } + + private void serializePayload(final ChannelBuffer buf, int family_idx) { + if (families == WHOLE_ROW) { return; // No payload when deleting whole rows. } + final boolean has_qualifiers = + (qualifiers != null && qualifiers[family_idx] != null); + final boolean has_timestamps = + (timestamps != null && timestamps[family_idx] != null); // Are we deleting a whole family at once or just a bunch of columns? - final byte type = (qualifiers == DELETE_FAMILY_MARKER + final byte type = (!has_qualifiers ? KeyValue.DELETE_FAMILY : (at_timestamp_only - ? KeyValue.DELETE - : KeyValue.DELETE_COLUMN)); - + ? KeyValue.DELETE + : KeyValue.DELETE_COLUMN)); + final byte[][] family_qualifiers = !has_qualifiers + ? DELETE_FAMILY_MARKER + : qualifiers[family_idx]; // Write the KeyValues - for (final byte[] qualifier : qualifiers) { - KeyValue.serialize(buf, type, timestamp, - key, family, qualifier, null); + for (int i = 0; i < family_qualifiers.length; i++) { + final byte[] qualifier = family_qualifiers[i]; + KeyValue.serialize(buf, type, + (has_timestamps ? timestamps[family_idx][i] : timestamp), + key, families[family_idx], qualifier, null); } } @@ -523,19 +753,34 @@ private int predictSerializedSize() { size += 8; // long: Timestamp. size += 8; // long: Lock ID. size += 4; // int: Number of families. - size += 1; // vint: Family length (guaranteed on 1 byte). - if (family == null) { - return size; + + size += payloadsSize(); + + return size; + } + + @Override + int payloadsSize() { + int size = 0; + if (families != WHOLE_ROW) { + for (int i = 0; i < families.length; i++) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += families[i].length; // The column family. + size += 4; // int: Number of KeyValues for this family. + size += payloadSize(i); + } } - size += family.length; // The column family. - size += 4; // int: Number of KeyValues for this family. - return size + payloadSize(); + return size; } /** Returns the serialized size of all the {@link KeyValue}s in this RPC. */ @Override int payloadSize() { - if (family == WHOLE_ROW) { + return payloadSize(0); + } + + private int payloadSize(int family_idx) { + if (families == WHOLE_ROW) { return 0; // No payload when deleting whole rows. } int size = 0; @@ -545,11 +790,15 @@ int payloadSize() { size += 2; // short:Length of the key. size += key.length; // The row key (again!). size += 1; // byte: Family length (again!). - size += family.length; // The column family (again!). + size += families[family_idx].length; // The column family (again!). size += 8; // long: The timestamp (again!). size += 1; // byte: The type of KeyValue. - size *= qualifiers.length; - for (final byte[] qualifier : qualifiers) { + final byte[][] family_qualifiers = + (qualifiers == null || qualifiers[family_idx] == null) + ? DELETE_FAMILY_MARKER + : qualifiers[family_idx]; + size *= family_qualifiers.length; + for (final byte[] qualifier : family_qualifiers) { size += qualifier.length; // The column qualifier. } return size; @@ -561,29 +810,38 @@ MutationProto toMutationProto() { .setRow(Bytes.wrap(key)) .setMutateType(MutationProto.MutationType.DELETE); - if (family != WHOLE_ROW) { + if (families != WHOLE_ROW) { final MutationProto.ColumnValue.Builder columns = // All columns ... - MutationProto.ColumnValue.newBuilder() - .setFamily(Bytes.wrap(family)); // ... for this family. - - final MutationProto.DeleteType type = - (qualifiers == DELETE_FAMILY_MARKER - ? MutationProto.DeleteType.DELETE_FAMILY - : (at_timestamp_only - ? MutationProto.DeleteType.DELETE_ONE_VERSION - : MutationProto.DeleteType.DELETE_MULTIPLE_VERSIONS)); - - // Now add all the qualifiers to delete. - for (int i = 0; i < qualifiers.length; i++) { - final MutationProto.ColumnValue.QualifierValue column = - MutationProto.ColumnValue.QualifierValue.newBuilder() - .setQualifier(Bytes.wrap(qualifiers[i])) - .setTimestamp(timestamp) - .setDeleteType(type) - .build(); - columns.addQualifierValue(column); + MutationProto.ColumnValue.newBuilder(); + for (int i = 0; i < families.length; i++) { + byte[] family = families[i]; + columns.clear(); + columns.setFamily(Bytes.wrap(family)); // ... for this family. + + if (qualifiers != null) { + final boolean has_timestamps = + (timestamps != null && timestamps[i] != null); + final MutationProto.DeleteType type = + (qualifiers[i] == null + ? MutationProto.DeleteType.DELETE_FAMILY + : (at_timestamp_only + ? MutationProto.DeleteType.DELETE_ONE_VERSION + : MutationProto.DeleteType.DELETE_MULTIPLE_VERSIONS)); + // Now add all the qualifiers to delete. + if (qualifiers[i] != null) { + for (int j = 0; j < qualifiers[i].length; j++) { + final MutationProto.ColumnValue.QualifierValue column = + MutationProto.ColumnValue.QualifierValue.newBuilder() + .setQualifier(Bytes.wrap(qualifiers[i][j])) + .setTimestamp((has_timestamps ? timestamps[i][j] : timestamp)) + .setDeleteType(type) + .build(); + columns.addQualifierValue(column); + } + } + } + del.addColumnValue(columns); } - del.addColumnValue(columns); } if (!durable) { @@ -623,19 +881,31 @@ private ChannelBuffer serializeOld(final byte server_version) { buf.writeLong(lockid); // Lock ID. // Families. - if (family == WHOLE_ROW) { + if (families == WHOLE_ROW) { buf.writeInt(0); // Number of families that follow. return buf; } - buf.writeInt(1); // Number of families that follow. - // Each family is then written like so: - writeByteArray(buf, family); // Column family name. - buf.writeInt(qualifiers.length); // How many KeyValues for this family? - serializePayload(buf); + buf.writeInt(families.length); // Number of families that follow. + serializePayloads(buf); // All families + return buf; } + @Override + void serializePayloads(ChannelBuffer buf) { + for (int i = 0; i < families.length; i++) { + // Each family is then written like so: + writeByteArray(buf, families[i]); // Column family name. + final byte[][] family_qualifiers = + (qualifiers != null && qualifiers[i] != null) + ? qualifiers[i] + : DELETE_FAMILY_MARKER; + buf.writeInt(family_qualifiers.length); // How many KeyValues for this family? + serializePayload(buf, i); + } + } + @Override Object deserialize(final ChannelBuffer buf, int cell_size) { HBaseRpc.ensureNoCell(cell_size); diff --git a/src/GetRequest.java b/src/GetRequest.java index 240ebbc6..aeb0d88b 100644 --- a/src/GetRequest.java +++ b/src/GetRequest.java @@ -50,8 +50,8 @@ public final class GetRequest extends HBaseRpc private static final byte[] EXISTS = new byte[] { 'e', 'x', 'i', 's', 't', 's' }; - private byte[] family; // TODO(tsuna): Handle multiple families? - private byte[][] qualifiers; + private byte[][] families; + private byte[][][] qualifiers; private long lockid = RowLock.NO_LOCK; /** @@ -142,6 +142,24 @@ public GetRequest(final byte[] table, this.qualifier(qualifier); } + /** + * Constructor. + * These byte arrays will NOT be copied. + * @param table The non-empty name of the table to use. + * @param key The row key to get in that table. + * @param families The column families. + * @param qualifiers The column qualifiers. + * @since 1.5 + */ + public GetRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers) { + super(table, key); + this.families(families); + this.qualifiers(qualifiers); + } + /** * Constructor. * @param table The non-empty name of the table to use. @@ -211,7 +229,7 @@ private boolean isGetRequest() { */ public GetRequest family(final byte[] family) { KeyValue.checkFamily(family); - this.family = family; + this.families = new byte[][] { family }; return this; } @@ -220,6 +238,33 @@ public GetRequest family(final String family) { return family(family.getBytes()); } + /** + * Specifies the set of column families to get. + * @param families The column families. + * This byte array will NOT be copied. + * @return {@code this}, always. + */ + public GetRequest families(final byte[][] families) { + for (byte[] family : families) { + KeyValue.checkFamily(family); + } + this.families = families; + return this; + } + + /** Specifies the set of column families to get. */ + public GetRequest families(final String[] families) { + for (String family : families) { + KeyValue.checkFamily(family.getBytes()); + } + this.families = new byte[families.length][]; + int i = 0; + for (String family : families) { + this.families[i++] = family.getBytes(); + } + return this; + } + /** * Specifies a particular column qualifier to get. * @param qualifier The column qualifier. @@ -231,7 +276,7 @@ public GetRequest qualifier(final byte[] qualifier) { throw new NullPointerException("qualifier"); } KeyValue.checkQualifier(qualifier); - this.qualifiers = new byte[][] { qualifier }; + this.qualifiers = new byte[][][] { { qualifier } }; return this; } @@ -249,6 +294,27 @@ public GetRequest qualifiers(final byte[][] qualifiers) { for (final byte[] qualifier : qualifiers) { KeyValue.checkQualifier(qualifier); } + this.qualifiers = new byte[][][] { qualifiers }; + return this; + } + + /** + * Specifies a particular set of column qualifiers to get. + * @param qualifiers The column qualifiers. + * This byte array will NOT be copied. + * @return {@code this}, always. + * @since 1.1 + */ + public GetRequest qualifiers(final byte[][][] qualifiers) { + if (qualifiers == null) { + throw new NullPointerException("qualifiers"); + } + for (final byte[][] family_qualifiers : qualifiers) { + if (family_qualifiers == null) continue; + for (final byte[] qualifier : family_qualifiers) { + KeyValue.checkQualifier(qualifier); + } + } this.qualifiers = qualifiers; return this; } @@ -313,17 +379,27 @@ public byte[] key() { @Override public byte[] family() { - return family; + return families[0]; + } + + @Override + public byte[][] getFamilies() { + return families; } @Override public byte[][] qualifiers() { + return qualifiers[0]; + } + + @Override + public byte[][][] getQualifiers() { return qualifiers; } public String toString() { final String klass = isGetRequest() ? "GetRequest" : "Exists"; - return super.toStringWithQualifiers(klass, family, qualifiers); + return super.toStringWithQualifiers(klass, families, qualifiers); } // ---------------------- // @@ -359,16 +435,20 @@ private int predictSerializedSize(final byte server_version) { size += 8; // long: Maximum timestamp. size += 1; // byte: Boolean: "all time". size += 4; // int: Number of families. - if (family != null) { - size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. - size += 1; // byte: Boolean: do we want specific qualifiers? - if (qualifiers != null) { - size += 4; // int: How many qualifiers follow? - for (final byte[] qualifier : qualifiers) { - size += 3; // vint: Qualifier length. - size += qualifier.length; // The qualifier. + if (families != null) { + int family_idx = 0; + for (byte family[] : families) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += family.length; // The family. + size += 1; // byte: Boolean: do we want specific qualifiers? + if (qualifiers != null && qualifiers[family_idx] != null) { + size += 4; // int: How many qualifiers follow? + for (final byte[] qualifier : qualifiers[family_idx]) { + size += 3; // vint: Qualifier length. + size += qualifier.length; // The qualifier. + } } + ++family_idx; } } if (server_version >= RegionClient.SERVER_VERSION_092_OR_ABOVE) { @@ -385,15 +465,18 @@ ChannelBuffer serialize(final byte server_version) { final ClientPB.Get.Builder getpb = ClientPB.Get.newBuilder() .setRow(Bytes.wrap(key)); - if (family != null) { - final ClientPB.Column.Builder column = ClientPB.Column.newBuilder(); - column.setFamily(Bytes.wrap(family)); - if (qualifiers != null) { - for (final byte[] qualifier : qualifiers) { - column.addQualifier(Bytes.wrap(qualifier)); - } + if (families != null) { + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + byte[] family = families[family_idx]; + final ClientPB.Column.Builder column = ClientPB.Column.newBuilder(); + column.setFamily(Bytes.wrap(family)); + if (qualifiers != null && qualifiers[family_idx] != null) { + for (final byte[] qualifier : qualifiers[family_idx]) { + column.addQualifier(Bytes.wrap(qualifier)); + } + } + getpb.addColumn(column.build()); } - getpb.addColumn(column.build()); } // TODO: Filters. @@ -446,19 +529,23 @@ private ChannelBuffer serializeOld(final byte server_version) { // all possible times. Not sure why it's part of the serialized RPC... // Families. - buf.writeInt(family != null ? 1 : 0); // Number of families that follow. - - if (family != null) { - // Each family is then written like so: - writeByteArray(buf, family); // Column family name. - if (qualifiers != null) { - buf.writeByte(0x01); // Boolean: We want specific qualifiers. - buf.writeInt(qualifiers.length); // How many qualifiers do we want? - for (final byte[] qualifier : qualifiers) { - writeByteArray(buf, qualifier); // Column qualifier name. + buf.writeInt(families != null ? families.length : 0); // Number of families that follow. + + if (families != null) { + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + byte[] family = families[family_idx]; + + // Each family is then written like so: + writeByteArray(buf, family); // Column family name. + if (qualifiers != null && qualifiers[family_idx] != null) { + buf.writeByte(0x01); // Boolean: We want specific qualifiers. + buf.writeInt(qualifiers[family_idx].length); // How many qualifiers do we want? + for (final byte[] qualifier : qualifiers[family_idx]) { + writeByteArray(buf, qualifier); // Column qualifier name. + } + } else { + buf.writeByte(0x00); // Boolean: we don't want specific qualifiers. } - } else { - buf.writeByte(0x00); // Boolean: we don't want specific qualifiers. } } if (server_version >= RegionClient.SERVER_VERSION_092_OR_ABOVE) { diff --git a/src/HBaseRpc.java b/src/HBaseRpc.java index 5d8ab93b..e9bfa1e5 100644 --- a/src/HBaseRpc.java +++ b/src/HBaseRpc.java @@ -27,6 +27,7 @@ package org.hbase.async; import java.io.IOException; +import java.util.Arrays; import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.CodedOutputStream; @@ -88,11 +89,18 @@ public interface HasKey { */ public interface HasFamily { /** - * Returns the family this RPC is for. + * Returns the first family of this RPC. *
* DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[] family(); + /** + * Returns all families of this RPC. + *
+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + * @since 1.5 + */ + public byte[][] getFamilies(); } /** @@ -114,11 +122,18 @@ public interface HasQualifier { */ public interface HasQualifiers { /** - * Returns the column qualifiers this RPC is for. + * Returns the column qualifiers for the first column family of this RPC. *
* DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[][] qualifiers(); + /** + * Returns the column qualifiers all column families of this RPC. + *
+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + * @since 1.5 + */ + public byte[][][] getQualifiers(); } /** @@ -145,6 +160,12 @@ public interface HasValues { * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[][] values(); + /** + * Returns the values all column families of this RPC. + *
+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + */ + public byte[][][] getValues(); } /** @@ -568,19 +589,95 @@ final String toStringWithQualifiers(final String classname, final byte[][] qualifiers, final byte[][] values, final String fields) { + return toStringWithQualifiers(classname, new byte[][] { family }, + new byte[][][] { qualifiers }, new byte[][][] { values }, fields); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *
+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers) { + return toStringWithQualifiers(classname, families, qualifiers, null, ""); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *
+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + * @param values A non-empty list of values or null. + * @param fields Additional fields to include in the output. + */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final String fields) { + return toStringWithQualifiers(classname, families, qualifiers, null, null, ""); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *
+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + * @param values A non-empty list of values or null. + * @param timestamps A non-empty list of timestamps or null. + * @param fields Additional fields to include in the output. + */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps, + final String fields) { final StringBuilder buf = new StringBuilder(256 // min=182 + fields.length()); buf.append(classname).append("(table="); Bytes.pretty(buf, table); buf.append(", key="); Bytes.pretty(buf, key); - buf.append(", family="); - Bytes.pretty(buf, family); - buf.append(", qualifiers="); - Bytes.pretty(buf, qualifiers); - if (values != null) { - buf.append(", values="); - Bytes.pretty(buf, values); + buf.append(", families="); + if (families == null + || families.length == 0 + || families == DeleteRequest.WHOLE_ROW) { + buf.append("null"); + } else { + buf.append("["); + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + buf.append("{name="); + byte[] family = families[family_idx]; + Bytes.pretty(buf, family); + if (qualifiers != null && qualifiers[family_idx] != null) { + buf.append(", qualifiers="); + Bytes.pretty(buf, qualifiers[family_idx]); + } + if (values != null && values[family_idx] != null) { + buf.append(", values="); + Bytes.pretty(buf, values[family_idx]); + } + if (timestamps != null && timestamps[family_idx] != null) { + buf.append(", timestamps="); + buf.append(Arrays.toString(timestamps[family_idx])); + } + buf.append("}, "); + } + buf.setLength(buf.length()-2); + buf.append("]"); } buf.append(fields); buf.append(", attempt=").append(attempt) diff --git a/src/MultiAction.java b/src/MultiAction.java index 6dae3d0d..b23afba2 100644 --- a/src/MultiAction.java +++ b/src/MultiAction.java @@ -151,13 +151,15 @@ private int predictSerializedSize(final byte server_version) { final byte[] region_name = rpc.getRegion().name(); final boolean new_region = !Bytes.equals(prev.getRegion().name(), region_name); - final byte[] family = rpc.family(); + final byte[][] families = rpc.getFamilies(); final boolean new_key = (new_region || prev.code() != rpc.code() || !Bytes.equals(prev.key, rpc.key) - || family == DeleteRequest.WHOLE_ROW); + // do not coalesce RPCs with multi CFs + || families.length > 1 || prev.getFamilies().length > 1 + || families == DeleteRequest.WHOLE_ROW); final boolean new_family = new_key || !Bytes.equals(prev.family(), - family); + families[0]); if (new_region) { size += 3; // vint: region name length (3 bytes => max length = 32768). @@ -190,8 +192,13 @@ private int predictSerializedSize(final byte server_version) { } if (new_family) { + if (families.length > 1) { + size += rpc.payloadsSize(); + prev = rpc; + continue; + } size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. + size += families[0].length; // The family. size += 4; // int: Number of KeyValues that follow. if (rpc.code() == PutRequest.CODE) { size += 4; // int: Total number of bytes for all those KeyValues. @@ -281,13 +288,15 @@ private ChannelBuffer serializeOld(final byte server_version) { final byte[] region_name = rpc.getRegion().name(); final boolean new_region = !Bytes.equals(prev.getRegion().name(), region_name); - final byte[] family = rpc.family(); + final byte[][] families = rpc.getFamilies(); final boolean new_key = (new_region || prev.code() != rpc.code() || !Bytes.equals(prev.key, rpc.key) - || family == DeleteRequest.WHOLE_ROW); + // do not coalesce RPCs with multi CFs + || families.length > 1 || prev.getFamilies().length > 1 + || families == DeleteRequest.WHOLE_ROW); final boolean new_family = new_key || !Bytes.equals(prev.family(), - family); + families[0]); if (new_key && use_multi && nkeys_index > 0) { buf.writeInt(0); // Number of "attributes" for the last key (none). @@ -374,13 +383,20 @@ private ChannelBuffer serializeOld(final byte server_version) { nbytes_per_family = 0; } - if (family == DeleteRequest.WHOLE_ROW) { + if (families == DeleteRequest.WHOLE_ROW) { prev = rpc; // Short circuit. We have no KeyValue to write. continue; // So loop again directly. + } else if (families.length > 1) { + // do not coalesces RPCs with multiple families + nfamilies = families.length; + rpc.serializePayloads(buf); + nrpcs_per_key++; + prev = rpc; + continue; } nfamilies++; - writeByteArray(buf, family); // The column family. + writeByteArray(buf, families[0]); // The column family. nkeys_per_family_index = buf.writerIndex(); // Number of "KeyValues" that follow. @@ -408,7 +424,7 @@ private ChannelBuffer serializeOld(final byte server_version) { // Note: the only case where nkeys_per_family_index remained -1 throughout // this whole ordeal is where we didn't have any KV to serialize because - // every RPC was a `DeleteRequest.WHOLE_ROW'. + // every RPC was a `DeleteRequest.WHOLE_ROW' or a multi-CF RPC. if (nkeys_per_family_index > 0) { // Monkey-patch everything for the last set of edits. buf.setInt(nkeys_per_family_index, nkeys_per_family); @@ -495,10 +511,14 @@ public int compare(final BatchableRpc a, final BatchableRpc b) { return d; } else if ((d = Bytes.memcmp(a.key, b.key)) != 0) { return d; + } else if (a.getFamilies().length == 1 && b.getFamilies().length == 1) { + // within a row, group all actions with single CF + // in the front so that they can possibly coalesce + return Bytes.memcmp(a.family(), b.family()); } - return Bytes.memcmp(a.family(), b.family()); - } + return (a.getFamilies().length - b.getFamilies().length); + } } /** @@ -683,7 +703,7 @@ Response deserializeMultiResponse(final ChannelBuffer buf) { // cloning "e", so instead we just abuse its `make' factory method // slightly to duplicate it. This mangles the message a bit, but // that's mostly harmless. - resps[n + k] = e.make(e.getMessage(), batch.get(n + k)); + resps[n + k] = e.make(e, batch.get(n + k)); } } else { for (int k = 0; k < nrpcs_per_key; k++) { @@ -783,6 +803,10 @@ public HBaseRpc getFailedRpc() { @Override MultiPutFailedException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof MultiPutFailedException) { + final MultiPutFailedException e = (MultiPutFailedException) msg; + return new MultiPutFailedException(e.getMessage(), rpc); + } return new MultiPutFailedException(msg.toString(), rpc); } diff --git a/src/NoSuchColumnFamilyException.java b/src/NoSuchColumnFamilyException.java index 988f5e09..9d1feae8 100644 --- a/src/NoSuchColumnFamilyException.java +++ b/src/NoSuchColumnFamilyException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override NoSuchColumnFamilyException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof NoSuchColumnFamilyException) { + final NoSuchColumnFamilyException e = (NoSuchColumnFamilyException) msg; + return new NoSuchColumnFamilyException(e.getMessage(), rpc); + } return new NoSuchColumnFamilyException(msg.toString(), rpc); } diff --git a/src/NotServingRegionException.java b/src/NotServingRegionException.java index dde348ae..00847a27 100644 --- a/src/NotServingRegionException.java +++ b/src/NotServingRegionException.java @@ -65,6 +65,10 @@ public HBaseRpc getFailedRpc() { @Override NotServingRegionException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof NotServingRegionException) { + final NotServingRegionException e = (NotServingRegionException) msg; + return new NotServingRegionException(e.getMessage(), rpc); + } return new NotServingRegionException(msg.toString(), rpc); } diff --git a/src/PutRequest.java b/src/PutRequest.java index 186340a8..ae0b97e5 100644 --- a/src/PutRequest.java +++ b/src/PutRequest.java @@ -89,8 +89,9 @@ public final class PutRequest extends BatchableRpc * - qualifiers.length == values.length * - qualifiers.length > 0 */ - private final byte[][] qualifiers; - private final byte[][] values; + private final byte[][][] qualifiers; + private final byte[][][] values; + private final long[][] timestamps; /** * Constructor using current time. @@ -139,8 +140,65 @@ public PutRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final byte[][] values) { - this(table, key, family, qualifiers, values, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns using current time. + * These byte arrays will NOT be copied. + *
+ * Note: If you want to set your own timestamp, use + * {@link #PutRequest(byte[], byte[], byte[], byte[][], byte[][], long)} + * instead. This constructor will let the RegionServer assign the timestamp + * to this write at the time using {@link System#currentTimeMillis} right + * before the write is persisted to the WAL. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param family The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[] family, + final byte[][] qualifiers, + final byte[][] values, + final long[] timestamps) { + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, new long[][] { timestamps }, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns in multiple families using current time. + * These byte arrays will NOT be copied. + *
+ * Note: If you want to set your own timestamp, use + * {@link #PutRequest(byte[], byte[], byte[], byte[][], byte[][], long)} + * instead. This constructor will let the RegionServer assign the timestamp + * to this write at the time using {@link System#currentTimeMillis} right + * before the write is persisted to the WAL. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values) { + this(table, key, families , qualifiers, values, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -182,7 +240,54 @@ public PutRequest(final byte[] table, final byte[][] qualifiers, final byte[][] values, final long timestamp) { - this(table, key, family, qualifiers, values, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, null, timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns with a specific timestamp. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamp The timestamp to set on this edit. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long timestamp) { + this(table, key, families, qualifiers, values, null, + timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns with a specific timestamp. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps) { + this(table, key, families, qualifiers, values, timestamps, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -254,7 +359,58 @@ public PutRequest(final byte[] table, final byte[][] values, final long timestamp, final RowLock lock) { - this(table, key, family, qualifiers, values, timestamp, lock.id()); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, null, timestamp, lock.id()); + } + + /** + * Constructor for multiple columns with current time and explicit row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamp The timestamp to set on this edit. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long timestamp, + final RowLock lock) { + this(table, key, families, qualifiers, values, null, + timestamp, lock.id()); + } + + /** + * Constructor for multiple columns with current time and explicit row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps, + final RowLock lock) { + this(table, key, families, qualifiers, values, timestamps, + KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -335,9 +491,8 @@ public PutRequest(final byte[] table, private PutRequest(final byte[] table, final KeyValue kv, final long lockid) { - super(table, kv.key(), kv.family(), kv.timestamp(), lockid); - this.qualifiers = new byte[][] { kv.qualifier() }; - this.values = new byte[][] { kv.value() }; + this(table, kv.key(), kv.family(), + kv.qualifier(), kv.value(), kv.timestamp(), lockid); } /** Private constructor. */ @@ -348,32 +503,72 @@ private PutRequest(final byte[] table, final byte[] value, final long timestamp, final long lockid) { - this(table, key, family, new byte[][] { qualifier }, new byte[][] { value }, - timestamp, lockid); + this(table, key, new byte[][] { family }, new byte[][][] { { qualifier } }, + new byte[][][] { { value } }, null, timestamp, lockid); } /** Private constructor. */ private PutRequest(final byte[] table, final byte[] key, - final byte[] family, - final byte[][] qualifiers, - final byte[][] values, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps, final long timestamp, final long lockid) { - super(table, key, family, timestamp, lockid); - KeyValue.checkFamily(family); - if (qualifiers.length != values.length) { - throw new IllegalArgumentException("Have " + qualifiers.length - + " qualifiers and " + values.length + " values. Should be equal."); - } else if (qualifiers.length == 0) { - throw new IllegalArgumentException("Need at least one qualifier/value."); - } - for (int i = 0; i < qualifiers.length; i++) { - KeyValue.checkQualifier(qualifiers[i]); - KeyValue.checkValue(values[i]); - } + super(table, key, families, timestamp, lockid); + checkParams(families, qualifiers, values, timestamps); this.qualifiers = qualifiers; this.values = values; + this.timestamps = timestamps; + } + + private void checkParams(final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps) { + if (families.length != qualifiers.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and qualifiers(%d) array size.", + families.length, qualifiers.length)); + } else if (families.length != values.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and values(%d) array size.", + families.length, values.length)); + } else if (timestamps != null && families.length != timestamps.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and timestamps(%d) array size.", + families.length, timestamps.length)); + } + + for (int idx = 0; idx < families.length; idx++) { + KeyValue.checkFamily(families[idx]); + if (qualifiers[idx] == null || qualifiers[idx].length == 0) { + throw new IllegalArgumentException( + "No qualifiers are specifed for family " + + families[idx] + " at index " + idx); + } else if (values[idx] == null || values[idx].length == 0) { + throw new IllegalArgumentException( + "No values are specifed for family " + + families[idx] + " at index " + idx); + } else if (qualifiers[idx].length != values[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + values[idx].length + " values for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } else if (timestamps != null) { // check timestamps if specified + if (qualifiers[idx].length != timestamps[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + timestamps[idx].length + " timestamps for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } + } + for (int i = 0; i < qualifiers[idx].length; i++) { + KeyValue.checkQualifier(qualifiers[idx][i]); + KeyValue.checkValue(values[idx][i]); + } + } } @Override @@ -400,15 +595,25 @@ public byte[] key() { */ @Override public byte[] qualifier() { - return qualifiers[0]; + return qualifiers[0][0]; } /** - * {@inheritDoc} + * Returns the qualifiers of first column family in the set of + * edits in this RPC. * @since 1.3 */ @Override public byte[][] qualifiers() { + return qualifiers[0]; + } + + /** + * {@inheritDoc} + * @since 1.5 + */ + @Override + public byte[][][] getQualifiers() { return qualifiers; } @@ -418,7 +623,7 @@ public byte[][] qualifiers() { */ @Override public byte[] value() { - return values[0]; + return values[0][0]; } /** @@ -427,12 +632,21 @@ public byte[] value() { */ @Override public byte[][] values() { + return values[0]; + } + + /** + * {@inheritDoc} + * @since 1.5 + */ + @Override + public byte[][][] getValues() { return values; } public String toString() { return super.toStringWithQualifiers("PutRequest", - family, qualifiers, values, + families, qualifiers, values, timestamps, ", timestamp=" + timestamp + ", lockid=" + lockid + ", durable=" + durable @@ -462,23 +676,33 @@ byte code() { @Override int numKeyValues() { - return qualifiers.length; + return qualifiers[0].length; } @Override int payloadSize() { + return payloadSize(0); + } + + int payloadSize(int idx) { int size = 0; - for (int i = 0; i < qualifiers.length; i++) { - size += KeyValue.predictSerializedSize(key, family, qualifiers[i], values[i]); + for (int i = 0; i < qualifiers[idx].length; i++) { + size += KeyValue.predictSerializedSize( + key, families[idx], qualifiers[idx][i], values[idx][i]); } return size; } @Override void serializePayload(final ChannelBuffer buf) { - for (int i = 0; i < qualifiers.length; i++) { - KeyValue.serialize(buf, KeyValue.PUT, timestamp, key, family, - qualifiers[i], values[i]); + serializePayload(buf, 0); + } + + private void serializePayload(final ChannelBuffer buf, int idx) { + for (int i = 0; i < qualifiers[idx].length; i++) { + KeyValue.serialize(buf, KeyValue.PUT, + timestamps == null ? timestamp : timestamps[idx][i], + key, families[idx], qualifiers[idx][i], values[idx][i]); } } @@ -514,40 +738,52 @@ int predictPutSize() { size += 1; // bool: Whether or not to write to the WAL. size += 4; // int: Number of families for which we have edits. - size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. - size += 4; // int: Number of KeyValues that follow. - size += 4; // int: Total number of bytes for all those KeyValues. - - size += payloadSize(); + size += payloadsSize(); return size; } @Override - MutationProto toMutationProto() { - final MutationProto.ColumnValue.Builder columns = // All columns ... - MutationProto.ColumnValue.newBuilder() - .setFamily(Bytes.wrap(family)); // ... for this family. - - // Now add all the qualifier-value pairs. - for (int i = 0; i < qualifiers.length; i++) { - final MutationProto.ColumnValue.QualifierValue column = - MutationProto.ColumnValue.QualifierValue.newBuilder() - .setQualifier(Bytes.wrap(qualifiers[i])) - .setValue(Bytes.wrap(values[i])) - .setTimestamp(timestamp) - .build(); - columns.addQualifierValue(column); + int payloadsSize() { + int size = 0; + for (int i = 0; i < families.length; i++) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += families[i].length; // The family. + size += 4; // int: Number of KeyValues that follow. + size += 4; // int: Total number of bytes for all those KeyValues. + size += payloadSize(i); } + return size; + } + @Override + MutationProto toMutationProto() { final MutationProto.Builder put = MutationProto.newBuilder() - .setRow(Bytes.wrap(key)) - .setMutateType(MutationProto.MutationType.PUT) - .addColumnValue(columns); + .setRow(Bytes.wrap(key)) + .setMutateType(MutationProto.MutationType.PUT); if (!durable) { put.setDurability(MutationProto.Durability.SKIP_WAL); } + + final MutationProto.ColumnValue.Builder columns = + MutationProto.ColumnValue.newBuilder(); + for (int family_idx = 0; family_idx < families.length; family_idx++) { + columns.clear(); + columns.setFamily(Bytes.wrap(families[family_idx])); // ... for this family. + + // Now add all the qualifier-value pairs. + for (int i = 0; i < qualifiers[family_idx].length; i++) { + final MutationProto.ColumnValue.QualifierValue column = + MutationProto.ColumnValue.QualifierValue.newBuilder() + .setQualifier(Bytes.wrap(qualifiers[family_idx][i])) + .setValue(Bytes.wrap(values[family_idx][i])) + .setTimestamp(timestamps == null ? timestamp : timestamps[family_idx][i]) + .build(); + columns.addQualifierValue(column); + } + put.addColumnValue(columns); + } + return put.build(); } @@ -599,12 +835,19 @@ void serializeInto(final ChannelBuffer buf) { buf.writeLong(lockid); // Lock ID. buf.writeByte(durable ? 0x01 : 0x00); // Whether or not to use the WAL. - buf.writeInt(1); // Number of families that follow. - writeByteArray(buf, family); // The column family. + buf.writeInt(families.length); // Number of families that follow. + serializePayloads(buf); + } + + @Override + void serializePayloads(ChannelBuffer buf) { + for (int i = 0; i < families.length; i++) { + writeByteArray(buf, families[i]); // The column family. - buf.writeInt(qualifiers.length); // Number of "KeyValues" that follow. - buf.writeInt(payloadSize()); // Size of the KV that follows. - serializePayload(buf); + buf.writeInt(qualifiers[i].length); // Number of "KeyValues" that follow. + buf.writeInt(payloadSize(i)); // Size of the KV that follows. + serializePayload(buf, i); + } } } diff --git a/src/RemoteException.java b/src/RemoteException.java index 06d43c47..5160749f 100644 --- a/src/RemoteException.java +++ b/src/RemoteException.java @@ -53,7 +53,7 @@ public String getType() { @Override RemoteException make(final Object msg, final HBaseRpc rpc) { - if (msg instanceof RemoteException) { + if (msg == this || msg instanceof RemoteException) { final RemoteException e = (RemoteException) msg; return new RemoteException(e.getType(), e.getMessage()); } diff --git a/src/UnknownRowLockException.java b/src/UnknownRowLockException.java index a0acd4bc..aa0a569e 100644 --- a/src/UnknownRowLockException.java +++ b/src/UnknownRowLockException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override UnknownRowLockException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof UnknownRowLockException) { + final UnknownRowLockException e = (UnknownRowLockException) msg; + return new UnknownRowLockException(e.getMessage(), rpc); + } return new UnknownRowLockException(msg.toString(), rpc); } diff --git a/src/UnknownScannerException.java b/src/UnknownScannerException.java index e5150492..a983ac98 100644 --- a/src/UnknownScannerException.java +++ b/src/UnknownScannerException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override UnknownScannerException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof UnknownScannerException) { + final UnknownScannerException e = (UnknownScannerException) msg; + return new UnknownScannerException(e.getMessage(), rpc); + } return new UnknownScannerException(msg.toString(), rpc); } diff --git a/src/VersionMismatchException.java b/src/VersionMismatchException.java index eedca0c5..75d94a1d 100644 --- a/src/VersionMismatchException.java +++ b/src/VersionMismatchException.java @@ -55,6 +55,10 @@ public HBaseRpc getFailedRpc() { @Override VersionMismatchException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof VersionMismatchException) { + final VersionMismatchException e = (VersionMismatchException) msg; + return new VersionMismatchException(e.getMessage(), rpc); + } return new VersionMismatchException(msg.toString(), rpc); }