From d32b31c32e7ab421dccce4d5615ab373477446b2 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Thu, 16 Sep 2021 08:15:45 +0200 Subject: [PATCH 1/2] GEODE-9602: QueryObserver improvements. - Make QueryObserverHolder thread-safe - Allow having an observer per query by means of setting the observer in the query at the start of the execution. - Invoke beforeIterationEvaluation and afterIterationEvaluation callbacks when query is using indexes. --- .../query/QueryServiceRegressionTest.java | 6 +- .../internal/OrderByComparatorJUnitTest.java | 6 +- .../internal/QueryObserverCallbacksTest.java | 55 +++++++++++++++ .../query/internal/QueryTraceJUnitTest.java | 69 ++++++++++++++++--- .../AbstractGroupOrRangeJunction.java | 2 +- .../query/internal/AllGroupJunction.java | 2 +- .../query/internal/CompiledComparison.java | 4 +- .../query/internal/CompiledGroupBySelect.java | 2 +- .../cache/query/internal/CompiledIn.java | 2 +- .../query/internal/CompiledJunction.java | 2 +- .../cache/query/internal/CompiledSelect.java | 8 +-- .../query/internal/CompiledUndefined.java | 2 +- .../internal/CompositeGroupJunction.java | 2 +- .../cache/query/internal/DefaultQuery.java | 21 ++++-- .../query/internal/ExecutionContext.java | 10 +++ .../query/internal/OrderByComparator.java | 4 +- .../query/internal/QueryObserverHolder.java | 8 +-- .../cache/query/internal/QueryUtils.java | 52 ++++++-------- .../cache/query/internal/RangeJunction.java | 6 +- .../query/internal/index/AbstractIndex.java | 35 +++++++--- .../internal/index/CompactRangeIndex.java | 26 +++++-- .../cache/query/internal/index/HashIndex.java | 3 +- .../query/internal/index/PrimaryKeyIndex.java | 5 +- .../query/internal/index/RangeIndex.java | 9 ++- .../geode/internal/cache/LocalDataSet.java | 2 +- .../cache/partitioned/QueryMessage.java | 2 +- .../query/internal/CompiledInJUnitTest.java | 1 + .../query/internal/StructSetJUnitTest.java | 6 +- .../cli/functions/DataCommandFunction.java | 6 +- .../functional/StructSetOrResultsSet.java | 4 +- 30 files changed, 261 insertions(+), 101 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java index 2c2812384b7d..d2a509d3d49d 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/QueryServiceRegressionTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.Serializable; import java.util.ArrayList; @@ -339,7 +341,9 @@ public void testBugResultMismatch() throws Exception { SelectResults rs1 = (SelectResults) q1.execute(); SelectResults rs2 = (SelectResults) q2.execute(); - assertThatCode(() -> QueryUtils.union(rs1, rs2, null)).doesNotThrowAnyException(); + ExecutionContext context = mock(ExecutionContext.class); + when(context.getObserver()).thenReturn(new QueryObserverAdapter()); + assertThatCode(() -> QueryUtils.union(rs1, rs2, context)).doesNotThrowAnyException(); } /** diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/OrderByComparatorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/OrderByComparatorJUnitTest.java index dbcd3e8b831e..fdeef18a2985 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/OrderByComparatorJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/OrderByComparatorJUnitTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.lang.reflect.Field; import java.util.Collection; @@ -209,6 +211,8 @@ public void testCompareThrowsClassCastException() throws Exception { private OrderByComparator createComparator() throws Exception { StructTypeImpl objType = new StructTypeImpl(); - return new OrderByComparator(null, objType, null); + ExecutionContext context = mock(ExecutionContext.class); + when(context.getObserver()).thenReturn(new QueryObserverAdapter()); + return new OrderByComparator(null, objType, context); } } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java index 624267526f4d..c4fe9946818e 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java @@ -41,6 +41,7 @@ import org.apache.geode.cache.query.data.Address; import org.apache.geode.cache.query.data.Employee; import org.apache.geode.cache.query.data.Portfolio; +import org.apache.geode.cache.query.internal.index.IndexManager; import org.apache.geode.test.junit.categories.OQLQueryTest; import org.apache.geode.test.junit.rules.ServerStarterRule; @@ -85,6 +86,7 @@ public void setUp() throws Exception { @After public void tearDown() { QueryObserverHolder.reset(); + IndexManager.TEST_RANGEINDEX_ONLY = false; } @SuppressWarnings("unchecked") @@ -243,6 +245,59 @@ public void beforeAggregationsAndGroupByShouldBeCalledForAggregateFunctions() th verify(myQueryObserver, times(queries.size())).beforeAggregationsAndGroupBy(any()); } + @Test + public void testBeforeAndAfterIterationEvaluateNoWhere() throws Exception { + Query query = queryService.newQuery( + "select count(*) from " + SEPARATOR + "portfolio p"); + + query.execute(); + verify(myQueryObserver, times(0)).beforeIterationEvaluation(any(), any()); + verify(myQueryObserver, times(0)).afterIterationEvaluation(any()); + } + + @Test + public void testBeforeAndAfterIterationEvaluateWithoutIndex() throws Exception { + Query query = queryService.newQuery( + "select count(*) from " + SEPARATOR + "portfolio p where p.isActive = true "); + + query.execute(); + verify(myQueryObserver, times(4)).beforeIterationEvaluation(any(), any()); + verify(myQueryObserver, times(4)).afterIterationEvaluation(any()); + } + + @Test + public void testBeforeAndAfterIterationEvaluateWithCompactRangeIndex() throws Exception { + Query query = queryService.newQuery( + "select count(*) from " + SEPARATOR + "portfolio p where p.isActive = true "); + queryService.createIndex("isActiveIndex", "isActive", SEPARATOR + "portfolio"); + + query.execute(); + verify(myQueryObserver, times(2)).beforeIterationEvaluation(any(), any()); + verify(myQueryObserver, times(2)).afterIterationEvaluation(any()); + assertThat(myQueryObserver.dbIndx[2] == myQueryObserver.usedIndx) + .as("Validate callback of Indexes").isTrue(); + assertThat(myQueryObserver.unusedIndx == myQueryObserver.dbIndx[0] + || myQueryObserver.unusedIndx == myQueryObserver.dbIndx[1]) + .as("Validate callback of Indexes").isTrue(); + } + + @Test + public void testBeforeAndAfterIterationEvaluateWithRangeIndex() throws Exception { + IndexManager.TEST_RANGEINDEX_ONLY = true; + Query query = queryService.newQuery( + "select count(*) from " + SEPARATOR + "portfolio p where p.description = 'XXXX' "); + queryService.createIndex("descriptionIndex", "description", SEPARATOR + "portfolio"); + + query.execute(); + verify(myQueryObserver, times(2)).beforeIterationEvaluation(any(), any()); + verify(myQueryObserver, times(2)).afterIterationEvaluation(any()); + assertThat(myQueryObserver.dbIndx[2] == myQueryObserver.usedIndx) + .as("Validate callback of Indexes").isTrue(); + assertThat(myQueryObserver.unusedIndx == myQueryObserver.dbIndx[0] + || myQueryObserver.unusedIndx == myQueryObserver.dbIndx[1]) + .as("Validate callback of Indexes").isTrue(); + } + private static class MyQueryObserverImpl extends QueryObserverAdapter { private int j = 0; private Index usedIndx = null; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java index c02b96a6af65..07b97e258ebd 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.query.internal; import static org.apache.geode.cache.Region.SEPARATOR; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -60,10 +61,12 @@ public class QueryTraceJUnitTest { @Before public void setUp() throws Exception { CacheUtils.startCache(); + DefaultQuery.testHook = new BeforeQueryExecutionHook(); } @After public void tearDown() throws Exception { + DefaultQuery.testHook = null; CacheUtils.closeCache(); } @@ -104,7 +107,11 @@ public void testTraceOnPartitionedRegionWithTracePrefix() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -141,7 +148,11 @@ public void testTraceOnLocalRegionWithTracePrefix() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -183,7 +194,11 @@ public void testNegTraceOnPartitionedRegionWithTracePrefix() throws Exception { assertFalse(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should not have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -223,7 +238,11 @@ public void testNegTraceOnLocalRegionWithTracePrefix() throws Exception { assertFalse(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertFalse(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should not have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isNotInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -262,7 +281,11 @@ public void testTraceOnPartitionedRegionWithTracePrefixNoComments() throws Excep assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -296,8 +319,11 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception { assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); - // The query should return all elements in region. + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); } @@ -331,7 +357,11 @@ public void testTraceOnPartitionedRegionWithSmallTracePrefixNoComments() throws assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -366,7 +396,11 @@ public void testTraceOnLocalRegionWithSmallTracePrefixNoComments() throws Except assertTrue(((DefaultQuery) query).isTraced()); SelectResults results = (SelectResults) query.execute(); - assertTrue(QueryObserverHolder.getInstance() instanceof IndexTrackingQueryObserver); + + // The IndexTrackingObserver should have been set + BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; + assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); @@ -438,4 +472,21 @@ public void testQueryFailLocalRegionWithSmallTracePrefixNoSpace() throws Excepti } } + private class BeforeQueryExecutionHook implements DefaultQuery.TestHook { + private QueryObserver observer = null; + + @Override + public void doTestHook(final SPOTS spot, final DefaultQuery _ignored, + final ExecutionContext executionContext) { + switch (spot) { + case BEFORE_QUERY_EXECUTION: + observer = executionContext.getObserver(); + break; + } + } + + public QueryObserver getObserver() { + return observer; + } + } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java index f0090ae9f11e..6b97a51aa1b3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AbstractGroupOrRangeJunction.java @@ -383,7 +383,7 @@ private SelectResults auxIterateEvaluate(CompiledValue operand, ExecutionContext resultSet = QueryUtils.createResultCollection(context, elementType); } - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); try { observer.startIteration(intermediateResults, operand); Iterator iResultsIter = intermediateResults.iterator(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AllGroupJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AllGroupJunction.java index 54cde5b0c964..dc64f83f108f 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/AllGroupJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/AllGroupJunction.java @@ -169,7 +169,7 @@ private SelectResults evaluateAndJunction(ExecutionContext context) iterOperandsToSend = new CompiledJunction(cv, this.operator); } } - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); observer.beforeCartesianOfGroupJunctionsInAnAllGroupJunctionOfType_AND(results); resultsSet = QueryUtils.cartesian(results, itrsForResultFields, expansionList, finalList, context, iterOperandsToSend); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledComparison.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledComparison.java index 17ed5b16ffb6..497bb38b7536 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledComparison.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledComparison.java @@ -373,7 +373,7 @@ private SelectResults singleBaseCollectionFilterEvaluate(ExecutionContext contex // before the index lookup int op = reflectOnOperator(indexInfo._key()); // actual index lookup - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); List projAttrib = null; /* * Asif : First obtain the match level of index resultset. If the match level happens to be zero @@ -535,7 +535,7 @@ private SelectResults doubleBaseCollectionFilterEvaluate(ExecutionContext contex // each of the // one dimensional array can be either genuine result object or StructImpl // object. - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); context.cachePut(CompiledValue.INDEX_INFO, indxInfo); /* * Asif : If the independent Group of iterators passed is not null or the independent Group of diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledGroupBySelect.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledGroupBySelect.java index 951c867d350a..f8516443a8ea 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledGroupBySelect.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledGroupBySelect.java @@ -155,7 +155,7 @@ private void mapOriginalOrderByColumns(ExecutionContext context) public SelectResults evaluate(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { SelectResults selectResults = super.evaluate(context); - QueryObserverHolder.getInstance().beforeAggregationsAndGroupBy(selectResults); + context.getObserver().beforeAggregationsAndGroupBy(selectResults); return this.applyAggregateAndGroupBy(selectResults, context); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledIn.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledIn.java index b7e28b1b25e3..b982c658325e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledIn.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledIn.java @@ -524,7 +524,7 @@ SelectResults singleBaseCollectionFilterEvaluate(ExecutionContext context, } } - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); try { Object evalColln = evaluateColln(context); observer.beforeIndexLookup(indexInfo._index, TOK_EQ, evalColln); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java index fb0425fcd8aa..28cf9afd62fa 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledJunction.java @@ -368,7 +368,7 @@ SelectResults auxIterateEvaluate(CompiledValue operand, ExecutionContext context } - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); try { observer.startIteration(intermediateResults, operand); Iterator iResultsIter = intermediateResults.iterator(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledSelect.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledSelect.java index 694c50ed77ab..956be369b96d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledSelect.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledSelect.java @@ -537,7 +537,7 @@ public SelectResults evaluate(ExecutionContext context) throws FunctionDomainExc result = ((Filter) this.whereClause).filterEvaluate(context, null); if (!(context.cacheGet(RESULT_TYPE) instanceof Boolean)) { - QueryObserverHolder.getInstance() + context.getObserver() .beforeApplyingProjectionOnFilterEvaluatedResults(result); result = applyProjectionOnCollection(result, context, !needsTopLevelOrdering); } @@ -691,7 +691,7 @@ private SelectResults doIterationEvaluate(ExecutionContext context, boolean eval for (Iterator itr = tmpResults.iterator(); itr.hasNext();) { Object currObj = itr.next(); rIter.setCurrent(currObj); - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); observer.beforeIterationEvaluation(rIter, currObj); applyProjectionAndAddToResultSet(context, results, this.orderByAttrs == null); } @@ -773,7 +773,7 @@ private int doNestedIterations(int level, SelectResults results, ExecutionContex boolean addToResults = true; if (evaluateWhereClause) { Object result = this.whereClause.evaluate(context); - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); observer.afterIterationEvaluation(result); if (result == null) { addToResults = false; @@ -839,7 +839,7 @@ private int doNestedIterations(int level, SelectResults results, ExecutionContex Object currObj = aSr; rIter.setCurrent(currObj); - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); observer.beforeIterationEvaluation(rIter, currObj); numElementsInResult = doNestedIterations(level + 1, results, context, evaluateWhereClause, numElementsInResult); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledUndefined.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledUndefined.java index 6f8d9e6d4fb5..677c2549cfba 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledUndefined.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompiledUndefined.java @@ -100,7 +100,7 @@ public SelectResults filterEvaluate(ExecutionContext context, SelectResults inte } int op = _is_defined ? TOK_NE : TOK_EQ; Object key = QueryService.UNDEFINED; - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); try { observer.beforeIndexLookup(idxInfo[0]._index, op, key); context.cachePut(CompiledValue.INDEX_INFO, idxInfo[0]); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompositeGroupJunction.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompositeGroupJunction.java index 4e77d2ebd235..53b616c9d286 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompositeGroupJunction.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/CompositeGroupJunction.java @@ -309,7 +309,7 @@ private SelectResults evaluateAndJunction(ExecutionContext context) } // Do the cartesian of the different group junction results. // TODO:Asif Remove the time - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); observer.beforeCartesianOfGroupJunctionsInCompositeGroupJunctionOfType_AND(results); SelectResults grpCartRs = QueryUtils.cartesian(results, itrsForResultFields, expansionList, finalList, context, iterOp); diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java index 5003e20b3575..1f2704debab1 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java @@ -134,6 +134,8 @@ public class DefaultQuery implements Query { private static final ThreadLocal>> pdxClassToFieldsMap = ThreadLocal.withInitial(HashMap::new); + private QueryObserver oldQueryObserver = null; + static Map> getPdxClasstofieldsmap() { return pdxClassToFieldsMap.get(); } @@ -231,7 +233,7 @@ public Object execute(Object[] params) throws FunctionDomainException, TypeMisma try { // Setting the readSerialized flag for local queries this.cache.setPdxReadSerializedOverride(true); - indexObserver = this.startTrace(); + indexObserver = this.startTrace(context); if (qe != null) { if (DefaultQuery.testHook != null) { DefaultQuery.testHook.doTestHook(DefaultQuery.TestHook.SPOTS.BEFORE_QUERY_EXECUTION, @@ -335,7 +337,7 @@ private Object executeOnServer(Object[] parameters) { public Object executeUsingContext(ExecutionContext context) throws FunctionDomainException, TypeMismatchException, NameResolutionException, QueryInvocationTargetException { - QueryObserver observer = QueryObserverHolder.getInstance(); + QueryObserver observer = context.getObserver(); long startTime = statisticsClock.getTime(); TXStateProxy tx = ((TXManagerImpl) this.cache.getCacheTransactionManager()).pauseTransaction(); try { @@ -742,12 +744,12 @@ public Object execute(RegionFunctionContext context, Object[] params) Object result = null; try { - indexObserver = startTrace(); + final ExecutionContext executionContext = new ExecutionContext(null, cache); + indexObserver = startTrace(executionContext); if (qe != null) { LocalDataSet localDataSet = (LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context); Set buckets = localDataSet.getBucketSet(); - final ExecutionContext executionContext = new ExecutionContext(null, cache); result = qe.executeQuery(this, executionContext, params, buckets); return result; } else { @@ -770,7 +772,7 @@ public boolean isQueryWithFunctionContext() { return this.isQueryWithFunctionContext; } - public QueryObserver startTrace() { + public QueryObserver startTrace(ExecutionContext context) { QueryObserver queryObserver = null; if (this.traceOn && this.cache != null) { @@ -779,7 +781,8 @@ public QueryObserver startTrace() { queryObserver = qo; } else if (!QueryObserverHolder.hasObserver()) { queryObserver = new IndexTrackingQueryObserver(); - QueryObserverHolder.setInstance(queryObserver); + oldQueryObserver = QueryObserverHolder.setInstance(queryObserver); + context.setObserver(queryObserver); } else { queryObserver = qo; } @@ -800,6 +803,9 @@ public void endTrace(QueryObserver indexObserver, long startTime, Object result) DefaultQuery.getLogMessage(indexObserver, startTime, resultSize, this.queryString); logger.info(queryVerboseMsg); } + if (oldQueryObserver != null) { + QueryObserverHolder.setInstance(oldQueryObserver); + } } public void endTrace(QueryObserver indexObserver, long startTime, Collection result) { @@ -816,6 +822,9 @@ public void endTrace(QueryObserver indexObserver, long startTime, Collection list = new ArrayList<>(); @@ -237,6 +238,9 @@ private DataCommandResult select(InternalCache cache, Object principal, String q if (queryObserver != null) { QueryObserverHolder.reset(); } + if (tracedQuery.isTraced()) { + QueryObserverHolder.setInstance(oldQueryObserver); + } } } diff --git a/geode-junit/src/main/java/org/apache/geode/cache/query/functional/StructSetOrResultsSet.java b/geode-junit/src/main/java/org/apache/geode/cache/query/functional/StructSetOrResultsSet.java index b328cfe7d288..417fab24ca5b 100755 --- a/geode-junit/src/main/java/org/apache/geode/cache/query/functional/StructSetOrResultsSet.java +++ b/geode-junit/src/main/java/org/apache/geode/cache/query/functional/StructSetOrResultsSet.java @@ -49,7 +49,6 @@ import org.apache.geode.cache.query.internal.ExecutionContext; import org.apache.geode.cache.query.internal.OrderByComparator; import org.apache.geode.cache.query.internal.QueryObserverAdapter; -import org.apache.geode.cache.query.internal.QueryObserverHolder; import org.apache.geode.cache.query.types.ObjectType; import org.apache.geode.internal.util.ArrayUtils; @@ -237,11 +236,10 @@ public int compare(Struct o1, Struct o2) { } final Comparator secondLevelComparator = baseComparator; final Comparator finalComparator = new Comparator() { - @Override public int compare(Object o1, Object o2) { final boolean[] orderByColsEqual = new boolean[] {false}; - QueryObserverHolder.setInstance(new QueryObserverAdapter() { + context.setObserver(new QueryObserverAdapter() { @Override public void orderByColumnsEqual() { orderByColsEqual[0] = true; From 96a15bc4e382f049367ecfccf9be58289c01aaf1 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 21 Sep 2021 16:06:00 +0200 Subject: [PATCH 2/2] GEODE-9602: Changes after review --- .../internal/QueryObserverCallbacksTest.java | 5 +- .../query/internal/QueryTraceJUnitTest.java | 1 + .../query/internal/QueryObserverHolder.java | 23 +++---- .../query/internal/index/AbstractIndex.java | 65 ++++++++----------- .../internal/index/CompactRangeIndex.java | 43 ++++++------ 5 files changed, 62 insertions(+), 75 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java index c4fe9946818e..98932af387c1 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryObserverCallbacksTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -251,8 +252,8 @@ public void testBeforeAndAfterIterationEvaluateNoWhere() throws Exception { "select count(*) from " + SEPARATOR + "portfolio p"); query.execute(); - verify(myQueryObserver, times(0)).beforeIterationEvaluation(any(), any()); - verify(myQueryObserver, times(0)).afterIterationEvaluation(any()); + verify(myQueryObserver, never()).beforeIterationEvaluation(any(), any()); + verify(myQueryObserver, never()).afterIterationEvaluation(any()); } @Test diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java index 07b97e258ebd..07e0e46fe9be 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/QueryTraceJUnitTest.java @@ -324,6 +324,7 @@ public void testTraceOnLocalRegionWithTracePrefixNoComments() throws Exception { BeforeQueryExecutionHook hook = (BeforeQueryExecutionHook) DefaultQuery.testHook; assertThat(hook.getObserver()).isInstanceOf(IndexTrackingQueryObserver.class); + // The query should return all elements in region. assertEquals(region.size(), results.size()); QueryObserverHolder.reset(); } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java index bae6b720fb0a..26a8a249cd73 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryObserverHolder.java @@ -14,6 +14,8 @@ */ package org.apache.geode.cache.query.internal; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; @@ -49,31 +51,30 @@ public class QueryObserverHolder { * The current observer which will be notified of all query events. */ @MakeNotStatic - private static QueryObserver _instance = NO_OBSERVER; + private static final AtomicReference _instance = + new AtomicReference<>(NO_OBSERVER); /** * Set the given observer to be notified of query events. Returns the current observer. */ - public static synchronized QueryObserver setInstance(QueryObserver observer) { + public static QueryObserver setInstance(QueryObserver observer) { Support.assertArg(observer != null, "setInstance expects a non-null argument!"); - QueryObserver oldObserver = _instance; - _instance = observer; - return oldObserver; + return _instance.getAndSet(observer); } - public static synchronized boolean hasObserver() { - return _instance != NO_OBSERVER; + public static boolean hasObserver() { + return _instance.get() != NO_OBSERVER; } /** Return the current QueryObserver instance */ - public static synchronized QueryObserver getInstance() { - return _instance; + public static QueryObserver getInstance() { + return _instance.get(); } /** * Only for test purposes. */ - public static synchronized void reset() { - _instance = NO_OBSERVER; + public static void reset() { + _instance.set(NO_OBSERVER); } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java index 85706867ccea..aef2d0332923 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/AbstractIndex.java @@ -1824,19 +1824,8 @@ void addValuesToCollection(Collection result, CompiledValue iterOp, RuntimeItera synchronized (value) { for (Object o1 : ((Iterable) value)) { boolean ok = true; - if (reUpdateInProgress) { - // Compare the value in index with value in RegionEntry. - ok = verifyEntryAndIndexValue(entry, o1, context); - } - try { - if (ok && runtimeItr != null) { - runtimeItr.setCurrent(o1); - observer.beforeIterationEvaluation(iterOp, o1); - ok = QueryUtils.applyCondition(iterOp, context); - } - } finally { - observer.afterIterationEvaluation(ok); - } + ok = applyCondition(iterOp, runtimeItr, context, observer, o1, entry, + reUpdateInProgress, ok); if (ok) { applyProjection(projAttrib, context, result, o1, intermediateResults, isIntersection); @@ -1849,19 +1838,8 @@ void addValuesToCollection(Collection result, CompiledValue iterOp, RuntimeItera } else { for (Object o1 : ((Iterable) value)) { boolean ok = true; - if (reUpdateInProgress) { - // Compare the value in index with value in RegionEntry. - ok = verifyEntryAndIndexValue(entry, o1, context); - } - try { - if (ok && runtimeItr != null) { - runtimeItr.setCurrent(o1); - observer.beforeIterationEvaluation(iterOp, o1); - ok = QueryUtils.applyCondition(iterOp, context); - } - } finally { - observer.afterIterationEvaluation(ok); - } + ok = applyCondition(iterOp, runtimeItr, context, observer, o1, entry, + reUpdateInProgress, ok); if (ok) { applyProjection(projAttrib, context, result, o1, intermediateResults, isIntersection); @@ -1873,19 +1851,8 @@ void addValuesToCollection(Collection result, CompiledValue iterOp, RuntimeItera } } else { boolean ok = true; - if (reUpdateInProgress) { - // Compare the value in index with in RegionEntry. - ok = verifyEntryAndIndexValue(entry, value, context); - } - try { - if (ok && runtimeItr != null) { - runtimeItr.setCurrent(value); - observer.beforeIterationEvaluation(iterOp, value); - ok = QueryUtils.applyCondition(iterOp, context); - } - } finally { - observer.afterIterationEvaluation(ok); - } + ok = applyCondition(iterOp, runtimeItr, context, observer, value, entry, + reUpdateInProgress, ok); if (ok) { if (context.isCqQueryContext()) { result.add(new CqEntry(((RegionEntry) e.getKey()).getKey(), value)); @@ -1899,6 +1866,26 @@ void addValuesToCollection(Collection result, CompiledValue iterOp, RuntimeItera } } + private boolean applyCondition(CompiledValue iterOp, RuntimeIterator runtimeItr, + ExecutionContext context, QueryObserver observer, Object value, RegionEntry entry, + boolean reUpdateInProgress, boolean ok) throws FunctionDomainException, + TypeMismatchException, NameResolutionException, QueryInvocationTargetException { + if (reUpdateInProgress) { + // Compare the value in index with in RegionEntry. + ok = verifyEntryAndIndexValue(entry, value, context); + } + try { + if (ok && runtimeItr != null) { + runtimeItr.setCurrent(value); + observer.beforeIterationEvaluation(iterOp, value); + ok = QueryUtils.applyCondition(iterOp, context); + } + } finally { + observer.afterIterationEvaluation(ok); + } + return ok; + } + private boolean verifyLimit(Collection result, int limit, ExecutionContext context) { if (limit > 0) { if (!context.isDistinct()) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java index cc9049addf9c..91d9b32e2d5c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/CompactRangeIndex.java @@ -805,18 +805,7 @@ private void addToResultsFromEntries(Object lowerBoundKey, Object upperBoundKey, value = iterator.next(); if (value != null) { boolean ok = true; - - if (runtimeItr != null) { - runtimeItr.setCurrent(value); - } - try { - if (ok && runtimeItr != null) { - observer.beforeIterationEvaluation(iterOps, value); - ok = QueryUtils.applyCondition(iterOps, context); - } - } finally { - observer.afterIterationEvaluation(ok); - } + ok = applyCondition(iterOps, runtimeItr, context, observer, value, ok); if (ok) { applyCqOrProjection(projAttrib, context, result, value, intermediateResults, isIntersection, indexEntry.getDeserializedRegionKey()); @@ -845,17 +834,7 @@ private void addToResultsFromEntries(Object lowerBoundKey, Object upperBoundKey, ok = evaluateEntry((IndexInfo) indexInfo, context, null); } - if (runtimeItr != null) { - runtimeItr.setCurrent(value); - } - try { - if (ok && runtimeItr != null) { - observer.beforeIterationEvaluation(iterOps, value); - ok = QueryUtils.applyCondition(iterOps, context); - } - } finally { - observer.afterIterationEvaluation(ok); - } + ok = applyCondition(iterOps, runtimeItr, context, observer, value, ok); if (ok) { if (context != null && context.isCqQueryContext()) { result.add(new CqEntry(indexEntry.getDeserializedRegionKey(), value)); @@ -879,6 +858,24 @@ private void addToResultsFromEntries(Object lowerBoundKey, Object upperBoundKey, } } + private boolean applyCondition(CompiledValue iterOps, RuntimeIterator runtimeItr, + ExecutionContext context, QueryObserver observer, Object value, boolean ok) + throws FunctionDomainException, TypeMismatchException, NameResolutionException, + QueryInvocationTargetException { + if (runtimeItr != null) { + runtimeItr.setCurrent(value); + } + try { + if (ok && runtimeItr != null) { + observer.beforeIterationEvaluation(iterOps, value); + ok = QueryUtils.applyCondition(iterOps, context); + } + } finally { + observer.afterIterationEvaluation(ok); + } + return ok; + } + public List expandValue(ExecutionContext context, Object lowerBoundKey, Object upperBoundKey, int lowerBoundOperator, int upperBoundOperator, Object value) { try {