diff --git a/bzt/modules/aggregator.py b/bzt/modules/aggregator.py index 4a327cf044..8539575c5b 100644 --- a/bzt/modules/aggregator.py +++ b/bzt/modules/aggregator.py @@ -579,11 +579,14 @@ def __init__(self): self.cumulative = {} self.track_percentiles = [0.0, 50.0, 90.0, 95.0, 99.0, 99.9, 100.0] self.listeners = [] - self.buffer_len = 2 - self.min_buffer_len = 2 - self.max_buffer_len = float('inf') - self.buffer_multiplier = 2 - self.buffer_scale_idx = None + + self.buffer_scale_idx = None # string value of the most interesting percentile (e.q. '90.0') + self.buffer_multiplier = 2 # how many sequential samples we want to get before aggregation (approximately) + + self.buffer_len = 2 # how many data points we want to collect before aggregation + self.min_buffer_len = 2 # small buffer is more responsive but tends to loose data + self.max_buffer_len = float('inf') # buffer_len value can be changed during runtime. + self.histogram_max = 1.0 self.known_errors = fuzzyset.FuzzySet(use_levenshtein=True) self.max_error_count = 100 @@ -640,20 +643,6 @@ def add_listener(self, listener): """ self.listeners.append(listener) - def __merge_to_cumulative(self, current): - """ - Merge current KPISet to cumulative - :param current: KPISet - """ - for label, data in iteritems(current): - default = KPISet( - perc_levels=self.track_percentiles, - hist_max_rt=data[KPISet.RESP_TIMES].high, - ext_aggregation=self._redundant_aggregation) - cumul = self.cumulative.setdefault(label, default) - cumul.merge_kpis(data) - cumul.recalculate() - def datapoints(self, final_pass=False): """ Generator object that returns datapoints from the reader @@ -661,14 +650,6 @@ def datapoints(self, final_pass=False): :type final_pass: bool """ for datapoint in self._calculate_datapoints(final_pass): - current = datapoint[DataPoint.CURRENT] - if datapoint[DataPoint.CUMULATIVE] or not self._ramp_up_exclude(): - self.__merge_to_cumulative(current) - datapoint[DataPoint.CUMULATIVE] = copy.deepcopy(self.cumulative) - datapoint.recalculate() - - for listener in self.listeners: - listener.aggregated_second(datapoint) yield datapoint @abstractmethod @@ -678,13 +659,6 @@ def _calculate_datapoints(self, final_pass=False): """ yield - @abstractmethod - def _ramp_up_exclude(self): - """ - :rtype : bool - """ - return False - class ResultsReader(ResultsProvider): """ @@ -696,7 +670,7 @@ def __init__(self, perc_levels=None): self.ignored_labels = [] self.log = logging.getLogger(self.__class__.__name__) self.buffer = {} - self.min_timestamp = 0 + self.min_timestamp = 0 # last aggregated timestamp, older data is obsolete and must be fixed if perc_levels is not None: self.track_percentiles = perc_levels @@ -825,9 +799,12 @@ def _calculate_datapoints(self, final_pass=False): if self.cumulative and self.track_percentiles and self.buffer_scale_idx is not None: old_len = self.buffer_len + + # choose average timing of the most interesting percentile chosen_timing = self.cumulative[''][KPISet.PERCENTILES][self.buffer_scale_idx] - self.buffer_len = round(chosen_timing * self.buffer_multiplier) + # and calculate new buffer_len based on current speed of data getting + self.buffer_len = round(chosen_timing * self.buffer_multiplier) self.buffer_len = max(self.min_buffer_len, self.buffer_len) self.buffer_len = min(self.max_buffer_len, self.buffer_len) if self.buffer_len != old_len: @@ -882,7 +859,7 @@ def __init__(self): self.buffer = {} self.histogram_max = 5.0 self._sticky_concurrencies = {} - self.min_timestamp = None + self.min_timestamp = None # first aggregated data timestamp, just for exclude_ramp_up feature def converter(self, data): if data and self._redundant_aggregation: @@ -971,6 +948,39 @@ def startup(self): for underling in self.underlings: underling.set_aggregation(self._redundant_aggregation) + def datapoints(self, final_pass=False): + for datapoint in super(ConsolidatingAggregator, self).datapoints(final_pass): + if self.__class__.__name__ == "ConsolidatingAggregator": # it's true aggregator) + current = datapoint[DataPoint.CURRENT] + + if not self.min_timestamp: + self.min_timestamp = datapoint['ts'] + + exclude_as_ramp_up = self._ramp_up_exclude() and \ + (datapoint['ts'] < self.min_timestamp + self._get_max_ramp_up()) + if not exclude_as_ramp_up: + self.__merge_to_cumulative(current) + datapoint[DataPoint.CUMULATIVE] = copy.deepcopy(self.cumulative) + datapoint.recalculate() + + for listener in self.listeners: + listener.aggregated_second(datapoint) + yield datapoint + + def __merge_to_cumulative(self, current): + """ + Merge current KPISet to cumulative + :param current: KPISet + """ + for label, data in iteritems(current): + default = KPISet( + perc_levels=self.track_percentiles, + hist_max_rt=data[KPISet.RESP_TIMES].high, + ext_aggregation=self._redundant_aggregation) + cumul = self.cumulative.setdefault(label, default) + cumul.merge_kpis(data) + cumul.recalculate() + def add_underling(self, underling): """ Add source for aggregating @@ -1062,14 +1072,6 @@ def _calculate_datapoints(self, final_pass=False): self.log.debug("Merging into %s", tstamp) points_to_consolidate = self.buffer.pop(tstamp) - for subresult in points_to_consolidate: - if self._ramp_up_exclude(): - if not self.min_timestamp: - self.min_timestamp = subresult['ts'] - - if subresult['ts'] < self.min_timestamp + self._get_max_ramp_up(): - subresult[DataPoint.CUMULATIVE] = dict() - Concurrency.update_sticky(self._sticky_concurrencies, points_to_consolidate) point = points_to_consolidate[0] point[DataPoint.SOURCE_ID] = self.__class__.__name__ + '@' + str(id(self)) diff --git a/tests/unit/modules/test_Gatling.py b/tests/unit/modules/test_Gatling.py index 564fe9e46c..c3aa7e364b 100644 --- a/tests/unit/modules/test_Gatling.py +++ b/tests/unit/modules/test_Gatling.py @@ -625,8 +625,8 @@ def test_read_labels_problematic(self): list_of_values = list(obj.datapoints(True)) self.assertEqual(len(list_of_values), 1) self.assertEqual(obj.guessed_gatling_version, "3.4+") - last_cumul = list_of_values[-1][DataPoint.CUMULATIVE] - self.assertEqual(1, last_cumul['User-Login'][KPISet.SAMPLE_COUNT]) + last_current = list_of_values[-1][DataPoint.CURRENT] + self.assertEqual(1, last_current['User-Login'][KPISet.SAMPLE_COUNT]) def test_read_labels_regular(self): log_path = RESOURCES_DIR + "gatling/" @@ -642,8 +642,10 @@ def test_read_group(self): list_of_values = list(obj.datapoints(True)) self.assertEqual(len(list_of_values), 179) self.assertEqual(obj.guessed_gatling_version, "3.4+") - last_cumul = list_of_values[-1][DataPoint.CUMULATIVE] - self.assertEqual(2, len(last_cumul['[empty]'][KPISet.ERRORS])) + for val in list_of_values: + if '[empty]' in val[DataPoint.CURRENT]: + errors = val[DataPoint.CURRENT]['[empty]'][KPISet.ERRORS] + self.assertEqual(2, len(errors)) def test_read_rc_asserts(self): log_path = RESOURCES_DIR + "gatling/" @@ -651,6 +653,6 @@ def test_read_rc_asserts(self): list_of_values = list(obj.datapoints(True)) self.assertEqual(len(list_of_values), 1) self.assertEqual(obj.guessed_gatling_version, "3.4+") - last_cumul = list_of_values[-1][DataPoint.CUMULATIVE] - self.assertEqual(1, last_cumul[''][KPISet.RESP_CODES]['400']) - self.assertEqual(1, last_cumul[''][KPISet.RESP_CODES]['401']) + current = list_of_values[0][DataPoint.CURRENT] + self.assertEqual(1, current[''][KPISet.RESP_CODES]['400']) + self.assertEqual(1, current[''][KPISet.RESP_CODES]['401']) diff --git a/tests/unit/modules/test_aggregator.py b/tests/unit/modules/test_aggregator.py index 9efbe3bd28..cf598ad309 100644 --- a/tests/unit/modules/test_aggregator.py +++ b/tests/unit/modules/test_aggregator.py @@ -32,7 +32,7 @@ def test_1(self): obj.add_listener(mock) for point in mock.datapoints(): - self.assertNotEquals(0, point[DataPoint.CUMULATIVE][''].concurrency) + self.assertNotEquals(0, point[DataPoint.CURRENT][''].concurrency) mock.data.append((2, "", 1, r(), r(), r(), 200, None, '', 0)) mock.data.append((2, "", 1, r(), r(), r(), 200, None, '', 0)) @@ -58,13 +58,10 @@ def test_new_reader(self): mock.data.append((3, "d", 1, 5, 5, 5, 200, None, '', 0)) mock.data.append((4, "b", 1, 6, 6, 6, 200, None, '', 0)) - list(mock.datapoints(True)) + results = list(mock.datapoints(True)) - failed = mock.results[1] - self.assertEqual(2, failed['ts']) - - for kpis in (failed['current'], failed['cumulative']): - self.assertEqual(1, kpis['b']['fail']) + self.assertEqual(2, results[1]['ts']) + self.assertEqual(1, results[1]['current']['b']['fail']) def test_max_concurrency(self): mock = MockReader() @@ -75,7 +72,6 @@ def test_max_concurrency(self): data_point = list(mock.datapoints(True))[0] self.assertEqual(3, data_point[DataPoint.CURRENT][''].concurrency) - self.assertEqual(3, data_point[DataPoint.CUMULATIVE][''].concurrency) def test_sample_ignores(self): mock = MockReader() diff --git a/tests/unit/modules/test_cloudProvisioning.py b/tests/unit/modules/test_cloudProvisioning.py index 61f0924e63..70f7f9a91b 100644 --- a/tests/unit/modules/test_cloudProvisioning.py +++ b/tests/unit/modules/test_cloudProvisioning.py @@ -1754,22 +1754,16 @@ def test_get_errors(self): # frame [0, 1464248744) res1 = list(obj.datapoints(False)) self.assertEqual(1, len(res1)) - cumul = res1[0][DataPoint.CUMULATIVE] cur = res1[0][DataPoint.CURRENT] - self.assertEqual(1, len(cumul.keys())) self.assertEqual(1, len(cur.keys())) errors_1 = {'Not found': {'count': 10, 'rc': u'404'}} - self.assertEqual(self.convert_kpi_errors(cumul[""]["errors"]), errors_1) # all error data is written - self.assertEqual(self.convert_kpi_errors(cur[""]["errors"]), errors_1) # to 'current' and 'cumulative' + self.assertEqual(self.convert_kpi_errors(cur[""]["errors"]), errors_1) # frame [1464248744, 1464248745) res2 = list(obj.datapoints(False)) self.assertEqual(1, len(res2)) - cumul = res2[0][DataPoint.CUMULATIVE] cur = res2[0][DataPoint.CURRENT] - self.assertEqual(1, len(cumul.keys())) self.assertEqual(1, len(cur.keys())) - self.assertEqual(self.convert_kpi_errors(cumul[""]["errors"]), errors_1) # the same errors, self.assertEqual(cur[""]["errors"], []) # new errors not found mock.mock_get.update(self.get_errors_mock({ @@ -1785,7 +1779,6 @@ def test_get_errors(self): res3 = list(obj.datapoints(True)) # let's add the last timestamp [1464248745] self.assertEqual(1, len(res3)) - cumul = res3[0][DataPoint.CUMULATIVE] cur = res3[0][DataPoint.CURRENT] errors_all_full = { 'Not found': {'count': 11, 'rc': '404'}, @@ -1797,8 +1790,6 @@ def test_get_errors(self): 'assertion_example': {'count': 33, 'rc': 'All Assertions'}} errors_label1 = {'Strange behaviour': {'count': 666, 'rc': '666'}} - self.assertEqual(errors_label1, self.convert_kpi_errors(cumul["label1"]["errors"])) - self.assertEqual(errors_all_full, self.convert_kpi_errors(cumul[""]["errors"])) self.assertEqual(errors_label1, self.convert_kpi_errors(cur["label1"]["errors"])) self.assertEqual(errors_all_update, self.convert_kpi_errors(cur[""]["errors"])) @@ -1884,10 +1875,9 @@ def test_datapoint(self): obj.master = Master(data={"id": 1}) mock.apply(obj.master) res = list(obj.datapoints(True)) - cumulative_ = res[0][DataPoint.CUMULATIVE] - total = cumulative_[''] - percentiles_ = total[KPISet.PERCENTILES] - self.assertEquals(1.05, percentiles_['99.0']) + current = res[0][DataPoint.CURRENT] + percentiles = current[''][KPISet.PERCENTILES] + self.assertEquals(1.05, percentiles['99.0']) def test_no_kpis_on_cloud_crash(self): mock = BZMock() diff --git a/tests/unit/modules/test_consolidatingAggregator.py b/tests/unit/modules/test_consolidatingAggregator.py index 3b3ebea4cb..2c0e262fc9 100644 --- a/tests/unit/modules/test_consolidatingAggregator.py +++ b/tests/unit/modules/test_consolidatingAggregator.py @@ -257,7 +257,7 @@ def test_errors_cumulative(self): self.obj.add_underling(reader) self.obj.shutdown() self.obj.post_process() - cum_dict = self.obj.underlings[0].cumulative + cum_dict = self.obj.cumulative first_err_ids = [id(err) for err in cum_dict['first']['errors']] second_err_ids = [id(err) for err in cum_dict['second']['errors']] total_err_ids = [id(err) for err in cum_dict['']['errors']] diff --git a/tests/unit/modules/test_k6.py b/tests/unit/modules/test_k6.py index 2c5dff8df3..5552419a5d 100644 --- a/tests/unit/modules/test_k6.py +++ b/tests/unit/modules/test_k6.py @@ -114,5 +114,7 @@ def test_read(self): for datapoint in points: self.assertTrue(datapoint['ts'] > 1500000000) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 2) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 2) + self.assertEqual(points[0][DataPoint.CURRENT][''][KPISet.SUCCESSES], 1) + self.assertEqual(points[1][DataPoint.CURRENT][''][KPISet.SUCCESSES], 1) + self.assertEqual(points[2][DataPoint.CURRENT][''][KPISet.FAILURES], 1) + self.assertEqual(points[3][DataPoint.CURRENT][''][KPISet.FAILURES], 1) \ No newline at end of file diff --git a/tests/unit/modules/test_molotov.py b/tests/unit/modules/test_molotov.py index f7df8fadb8..96d6ea4caf 100644 --- a/tests/unit/modules/test_molotov.py +++ b/tests/unit/modules/test_molotov.py @@ -136,5 +136,18 @@ def test_read(self): for datapoint in points: self.assertTrue(datapoint['ts'] > 1500000000) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 4) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 4) + + current_overalls = [point[DataPoint.CURRENT][''] for point in points] + + self.assertEqual(1, current_overalls[0][KPISet.SUCCESSES]) + self.assertEqual(0, current_overalls[0][KPISet.FAILURES]) + + self.assertEqual(1, current_overalls[1][KPISet.SUCCESSES]) + self.assertEqual(2, current_overalls[1][KPISet.FAILURES]) + + self.assertEqual(0, current_overalls[2][KPISet.SUCCESSES]) + self.assertEqual(2, current_overalls[2][KPISet.FAILURES]) + + self.assertEqual(2, current_overalls[3][KPISet.SUCCESSES]) + self.assertEqual(0, current_overalls[3][KPISet.FAILURES]) + diff --git a/tests/unit/modules/test_vegeta.py b/tests/unit/modules/test_vegeta.py index 714db739a4..075b11b192 100644 --- a/tests/unit/modules/test_vegeta.py +++ b/tests/unit/modules/test_vegeta.py @@ -210,5 +210,5 @@ def test_read(self): for datapoint in points: self.assertTrue(datapoint['ts'] > 1500000000) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 3) - self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 1) + self.assertEqual(points[-1][DataPoint.CURRENT][''][KPISet.SUCCESSES], 3) + self.assertEqual(points[-1][DataPoint.CURRENT][''][KPISet.FAILURES], 1)