Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ func (clnt *Client) QueryExecute(policy *QueryPolicy,
if len(statement.BinNames) > 0 {
return nil, ErrNoBinNamesAllowedInQueryExecute.err()
}
taskId := statement.prepareTaskId()

policy = clnt.getUsableQueryPolicy(policy)
writePolicy = clnt.getUsableWritePolicy(writePolicy)
Expand All @@ -1041,13 +1042,13 @@ func (clnt *Client) QueryExecute(policy *QueryPolicy,

var errs Error
for i := range nodes {
command := newServerCommand(nodes[i], policy, writePolicy, statement, statement.TaskId, ops)
command := newServerCommand(nodes[i], policy, writePolicy, statement, ops)
if err := command.Execute(); err != nil {
errs = chainErrors(err, errs)
}
}

return NewExecuteTask(clnt.cluster, statement), errs
return NewExecuteTask(clnt.cluster, statement, taskId), errs
}

// ExecuteUDF applies user defined function on records that match the statement filter.
Expand All @@ -1065,6 +1066,7 @@ func (clnt *Client) ExecuteUDF(policy *QueryPolicy,
functionArgs ...Value,
) (*ExecuteTask, Error) {
policy = clnt.getUsableQueryPolicy(policy)
taskId := statement.prepareTaskId()

nodes := clnt.cluster.GetNodes()
if len(nodes) == 0 {
Expand All @@ -1075,13 +1077,13 @@ func (clnt *Client) ExecuteUDF(policy *QueryPolicy,

var errs Error
for i := range nodes {
command := newServerCommand(nodes[i], policy, nil, statement, statement.TaskId, nil)
command := newServerCommand(nodes[i], policy, nil, statement, nil)
if err := command.Execute(); err != nil {
errs = chainErrors(err, errs)
}
}

return NewExecuteTask(clnt.cluster, statement), errs
return NewExecuteTask(clnt.cluster, statement, taskId), errs
}

// ExecuteUDFNode applies user defined function on records that match the statement filter on the specified node.
Expand All @@ -1100,17 +1102,18 @@ func (clnt *Client) ExecuteUDFNode(policy *QueryPolicy,
functionArgs ...Value,
) (*ExecuteTask, Error) {
policy = clnt.getUsableQueryPolicy(policy)
taskId := statement.prepareTaskId()

if node == nil {
return nil, ErrClusterIsEmpty.err()
}

statement.SetAggregateFunction(packageName, functionName, functionArgs, false)

command := newServerCommand(node, policy, nil, statement, statement.TaskId, nil)
command := newServerCommand(node, policy, nil, statement, nil)
err := command.Execute()

return NewExecuteTask(clnt.cluster, statement), err
return NewExecuteTask(clnt.cluster, statement, taskId), err
}

// SetXDRFilter sets XDR filter for given datacenter name and namespace. The expression filter indicates
Expand Down
4 changes: 2 additions & 2 deletions execute_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type ExecuteTask struct {
}

// NewExecuteTask initializes task with fields needed to query server nodes.
func NewExecuteTask(cluster *Cluster, statement *Statement) *ExecuteTask {
func NewExecuteTask(cluster *Cluster, statement *Statement, taskId uint64) *ExecuteTask {
return &ExecuteTask{
baseTask: newTask(cluster),
taskID: statement.TaskId,
taskID: taskId,
scan: statement.IsScan(),
observed: make(map[string]struct{}, len(cluster.GetNodes())),
}
Expand Down
4 changes: 2 additions & 2 deletions proxy_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (clnt *ProxyClient) QueryExecute(policy *QueryPolicy,
policy = clnt.getUsableQueryPolicy(policy)
writePolicy = clnt.getUsableWritePolicy(writePolicy)

command := newServerCommand(nil, policy, writePolicy, statement, statement.TaskId, ops)
command := newServerCommand(nil, policy, writePolicy, statement, ops)

if err := command.ExecuteGRPC(clnt); err != nil {
return nil, err
Expand Down Expand Up @@ -992,7 +992,7 @@ func (clnt *ProxyClient) ExecuteUDF(policy *QueryPolicy,

nstatement := *statement
nstatement.SetAggregateFunction(packageName, functionName, functionArgs, false)
command := newServerCommand(nil, policy, wpolicy, &nstatement, nstatement.TaskId, nil)
command := newServerCommand(nil, policy, wpolicy, &nstatement, nil)

if err := command.ExecuteGRPC(clnt); err != nil {
return nil, err
Expand Down
73 changes: 73 additions & 0 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,4 +554,77 @@ var _ = gg.Describe("Query operations", func() {
}
})

gg.It("must allow reusing the same statement for multiple QueryExecute calls without errors", func() {
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewEqualFilter(bin6.Name, 1))

updateBin := as.NewBin("Aerospike7", 100)

tsk1, err := client.QueryExecute(queryPolicy, nil, stm, as.PutOp(updateBin))
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(<-tsk1.OnComplete()).To(gm.BeNil())

updateBin2 := as.NewBin("Aerospike7", 200)
tsk2, err := client.QueryExecute(queryPolicy, nil, stm, as.PutOp(updateBin2))
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(<-tsk2.OnComplete()).To(gm.BeNil())

updateBin3 := as.NewBin("Aerospike7", 300)
tsk3, err := client.QueryExecute(queryPolicy, nil, stm, as.PutOp(updateBin3))
gm.Expect(err).ToNot(gm.HaveOccurred())
gm.Expect(<-tsk3.OnComplete()).To(gm.BeNil())

// Verify that the last operation was applied correctly by reading records back
stmRead := as.NewStatement(ns, set)
recordset, err := client.Query(queryPolicy, stmRead)
gm.Expect(err).ToNot(gm.HaveOccurred())

foundRecords := 0
for res := range recordset.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
rec := res.Record
gm.Expect(rec).ToNot(gm.BeNil())
gm.Expect(rec.Bins["Aerospike7"]).To(gm.Equal(300))
foundRecords++
}

gm.Expect(foundRecords).To(gm.Equal(keyCount))
})

gg.It("must allow reusing the same statement for multiple Query calls without errors", func() {
stm := as.NewStatement(ns, set)
stm.SetFilter(as.NewRangeFilter(bin3.Name, 0, math.MaxInt16))

recordset1, err := client.Query(queryPolicy, stm)
gm.Expect(err).ToNot(gm.HaveOccurred())

count1 := 0
for res := range recordset1.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
count1++
}

recordset2, err := client.Query(queryPolicy, stm)
gm.Expect(err).ToNot(gm.HaveOccurred())

count2 := 0
for res := range recordset2.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
count2++
}

recordset3, err := client.Query(queryPolicy, stm)
gm.Expect(err).ToNot(gm.HaveOccurred())

count3 := 0
for res := range recordset3.Results() {
gm.Expect(res.Err).ToNot(gm.HaveOccurred())
count3++
}

// All calls should return the same number of records
gm.Expect(count1).To(gm.Equal(count2))
gm.Expect(count2).To(gm.Equal(count3))
gm.Expect(count1).To(gm.BeNumerically(">", 0))
})
})
2 changes: 1 addition & 1 deletion server_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type serverCommand struct {
queryCommand
}

func newServerCommand(node *Node, policy *QueryPolicy, writePolicy *WritePolicy, statement *Statement, taskId uint64, operations []*Operation) *serverCommand {
func newServerCommand(node *Node, policy *QueryPolicy, writePolicy *WritePolicy, statement *Statement, operations []*Operation) *serverCommand {
return &serverCommand{
queryCommand: *newQueryCommand(node, policy, writePolicy, statement, operations, nil),
}
Expand Down
21 changes: 20 additions & 1 deletion statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewStatement(ns string, set string, binNames ...string) *Statement {
SetName: set,
BinNames: binNames,
ReturnData: true,
TaskId: rand.Uint64(),
TaskId: 0,
}
}

Expand Down Expand Up @@ -113,3 +113,22 @@ func (stmt *Statement) terminationError() types.ResultCode {
func (stmt *Statement) prepare(returnData bool) {
stmt.ReturnData = returnData
}

func (stmt *Statement) prepareTaskId() uint64 {
// If TaskId is 0, set it to a new random value and return the same statement.
// This also means that that the taskId was never set by the user.
// If TaskId is non-zero, it means that the user set it manually.
// In that case, we make a copy of the statement and set a new random taskId on the copy.
// This way we don't modify the original statement that the user provided.
// This is important because the user might want to reuse the same statement for multiple queries.
// However the control of the taskId is now with the client library and not the user.
// Important to remember is that the server will reject queries that have already been executed and the result is not
// available yet.
if stmt.TaskId == 0 {
taskId := rand.Uint64()

return taskId
}

return stmt.TaskId
}
Loading