Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 47 additions & 45 deletions bzt/modules/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,14 @@ def __init__(self):
self.cumulative = {}
self.track_percentiles = [0.0, 50.0, 90.0, 95.0, 99.0, 99.9, 100.0]
self.listeners = []
self.buffer_len = 2
self.min_buffer_len = 2
self.max_buffer_len = float('inf')
self.buffer_multiplier = 2
self.buffer_scale_idx = None

self.buffer_scale_idx = None # string value of the most interesting percentile (e.q. '90.0')
self.buffer_multiplier = 2 # how many sequential samples we want to get before aggregation (approximately)

self.buffer_len = 2 # how many data points we want to collect before aggregation
self.min_buffer_len = 2 # small buffer is more responsive but tends to loose data
self.max_buffer_len = float('inf') # buffer_len value can be changed during runtime.

self.histogram_max = 1.0
self.known_errors = fuzzyset.FuzzySet(use_levenshtein=True)
self.max_error_count = 100
Expand Down Expand Up @@ -640,35 +643,13 @@ def add_listener(self, listener):
"""
self.listeners.append(listener)

def __merge_to_cumulative(self, current):
"""
Merge current KPISet to cumulative
:param current: KPISet
"""
for label, data in iteritems(current):
default = KPISet(
perc_levels=self.track_percentiles,
hist_max_rt=data[KPISet.RESP_TIMES].high,
ext_aggregation=self._redundant_aggregation)
cumul = self.cumulative.setdefault(label, default)
cumul.merge_kpis(data)
cumul.recalculate()

def datapoints(self, final_pass=False):
"""
Generator object that returns datapoints from the reader

:type final_pass: bool
"""
for datapoint in self._calculate_datapoints(final_pass):
current = datapoint[DataPoint.CURRENT]
if datapoint[DataPoint.CUMULATIVE] or not self._ramp_up_exclude():
self.__merge_to_cumulative(current)
datapoint[DataPoint.CUMULATIVE] = copy.deepcopy(self.cumulative)
datapoint.recalculate()

for listener in self.listeners:
listener.aggregated_second(datapoint)
yield datapoint

@abstractmethod
Expand All @@ -678,13 +659,6 @@ def _calculate_datapoints(self, final_pass=False):
"""
yield

@abstractmethod
def _ramp_up_exclude(self):
"""
:rtype : bool
"""
return False


class ResultsReader(ResultsProvider):
"""
Expand All @@ -696,7 +670,7 @@ def __init__(self, perc_levels=None):
self.ignored_labels = []
self.log = logging.getLogger(self.__class__.__name__)
self.buffer = {}
self.min_timestamp = 0
self.min_timestamp = 0 # last aggregated timestamp, older data is obsolete and must be fixed
if perc_levels is not None:
self.track_percentiles = perc_levels

Expand Down Expand Up @@ -825,9 +799,12 @@ def _calculate_datapoints(self, final_pass=False):

if self.cumulative and self.track_percentiles and self.buffer_scale_idx is not None:
old_len = self.buffer_len

# choose average timing of the most interesting percentile
chosen_timing = self.cumulative[''][KPISet.PERCENTILES][self.buffer_scale_idx]
self.buffer_len = round(chosen_timing * self.buffer_multiplier)

# and calculate new buffer_len based on current speed of data getting
self.buffer_len = round(chosen_timing * self.buffer_multiplier)
self.buffer_len = max(self.min_buffer_len, self.buffer_len)
self.buffer_len = min(self.max_buffer_len, self.buffer_len)
if self.buffer_len != old_len:
Expand Down Expand Up @@ -882,7 +859,7 @@ def __init__(self):
self.buffer = {}
self.histogram_max = 5.0
self._sticky_concurrencies = {}
self.min_timestamp = None
self.min_timestamp = None # first aggregated data timestamp, just for exclude_ramp_up feature

def converter(self, data):
if data and self._redundant_aggregation:
Expand Down Expand Up @@ -971,6 +948,39 @@ def startup(self):
for underling in self.underlings:
underling.set_aggregation(self._redundant_aggregation)

def datapoints(self, final_pass=False):
for datapoint in super(ConsolidatingAggregator, self).datapoints(final_pass):
if self.__class__.__name__ == "ConsolidatingAggregator": # it's true aggregator)
current = datapoint[DataPoint.CURRENT]

if not self.min_timestamp:
self.min_timestamp = datapoint['ts']

exclude_as_ramp_up = self._ramp_up_exclude() and \
(datapoint['ts'] < self.min_timestamp + self._get_max_ramp_up())
if not exclude_as_ramp_up:
self.__merge_to_cumulative(current)
datapoint[DataPoint.CUMULATIVE] = copy.deepcopy(self.cumulative)
datapoint.recalculate()

for listener in self.listeners:
listener.aggregated_second(datapoint)
yield datapoint

def __merge_to_cumulative(self, current):
"""
Merge current KPISet to cumulative
:param current: KPISet
"""
for label, data in iteritems(current):
default = KPISet(
perc_levels=self.track_percentiles,
hist_max_rt=data[KPISet.RESP_TIMES].high,
ext_aggregation=self._redundant_aggregation)
cumul = self.cumulative.setdefault(label, default)
cumul.merge_kpis(data)
cumul.recalculate()

def add_underling(self, underling):
"""
Add source for aggregating
Expand Down Expand Up @@ -1062,14 +1072,6 @@ def _calculate_datapoints(self, final_pass=False):
self.log.debug("Merging into %s", tstamp)
points_to_consolidate = self.buffer.pop(tstamp)

for subresult in points_to_consolidate:
if self._ramp_up_exclude():
if not self.min_timestamp:
self.min_timestamp = subresult['ts']

if subresult['ts'] < self.min_timestamp + self._get_max_ramp_up():
subresult[DataPoint.CUMULATIVE] = dict()

Concurrency.update_sticky(self._sticky_concurrencies, points_to_consolidate)
point = points_to_consolidate[0]
point[DataPoint.SOURCE_ID] = self.__class__.__name__ + '@' + str(id(self))
Expand Down
16 changes: 9 additions & 7 deletions tests/unit/modules/test_Gatling.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,8 @@ def test_read_labels_problematic(self):
list_of_values = list(obj.datapoints(True))
self.assertEqual(len(list_of_values), 1)
self.assertEqual(obj.guessed_gatling_version, "3.4+")
last_cumul = list_of_values[-1][DataPoint.CUMULATIVE]
self.assertEqual(1, last_cumul['User-Login'][KPISet.SAMPLE_COUNT])
last_current = list_of_values[-1][DataPoint.CURRENT]
self.assertEqual(1, last_current['User-Login'][KPISet.SAMPLE_COUNT])

def test_read_labels_regular(self):
log_path = RESOURCES_DIR + "gatling/"
Expand All @@ -642,15 +642,17 @@ def test_read_group(self):
list_of_values = list(obj.datapoints(True))
self.assertEqual(len(list_of_values), 179)
self.assertEqual(obj.guessed_gatling_version, "3.4+")
last_cumul = list_of_values[-1][DataPoint.CUMULATIVE]
self.assertEqual(2, len(last_cumul['[empty]'][KPISet.ERRORS]))
for val in list_of_values:
if '[empty]' in val[DataPoint.CURRENT]:
errors = val[DataPoint.CURRENT]['[empty]'][KPISet.ERRORS]
self.assertEqual(2, len(errors))

def test_read_rc_asserts(self):
log_path = RESOURCES_DIR + "gatling/"
obj = DataLogReader(log_path, ROOT_LOGGER, 'gatling-5') # regular one
list_of_values = list(obj.datapoints(True))
self.assertEqual(len(list_of_values), 1)
self.assertEqual(obj.guessed_gatling_version, "3.4+")
last_cumul = list_of_values[-1][DataPoint.CUMULATIVE]
self.assertEqual(1, last_cumul[''][KPISet.RESP_CODES]['400'])
self.assertEqual(1, last_cumul[''][KPISet.RESP_CODES]['401'])
current = list_of_values[0][DataPoint.CURRENT]
self.assertEqual(1, current[''][KPISet.RESP_CODES]['400'])
self.assertEqual(1, current[''][KPISet.RESP_CODES]['401'])
12 changes: 4 additions & 8 deletions tests/unit/modules/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_1(self):
obj.add_listener(mock)

for point in mock.datapoints():
self.assertNotEquals(0, point[DataPoint.CUMULATIVE][''].concurrency)
self.assertNotEquals(0, point[DataPoint.CURRENT][''].concurrency)

mock.data.append((2, "", 1, r(), r(), r(), 200, None, '', 0))
mock.data.append((2, "", 1, r(), r(), r(), 200, None, '', 0))
Expand All @@ -58,13 +58,10 @@ def test_new_reader(self):
mock.data.append((3, "d", 1, 5, 5, 5, 200, None, '', 0))
mock.data.append((4, "b", 1, 6, 6, 6, 200, None, '', 0))

list(mock.datapoints(True))
results = list(mock.datapoints(True))

failed = mock.results[1]
self.assertEqual(2, failed['ts'])

for kpis in (failed['current'], failed['cumulative']):
self.assertEqual(1, kpis['b']['fail'])
self.assertEqual(2, results[1]['ts'])
self.assertEqual(1, results[1]['current']['b']['fail'])

def test_max_concurrency(self):
mock = MockReader()
Expand All @@ -75,7 +72,6 @@ def test_max_concurrency(self):

data_point = list(mock.datapoints(True))[0]
self.assertEqual(3, data_point[DataPoint.CURRENT][''].concurrency)
self.assertEqual(3, data_point[DataPoint.CUMULATIVE][''].concurrency)

def test_sample_ignores(self):
mock = MockReader()
Expand Down
18 changes: 4 additions & 14 deletions tests/unit/modules/test_cloudProvisioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -1754,22 +1754,16 @@ def test_get_errors(self):
# frame [0, 1464248744)
res1 = list(obj.datapoints(False))
self.assertEqual(1, len(res1))
cumul = res1[0][DataPoint.CUMULATIVE]
cur = res1[0][DataPoint.CURRENT]
self.assertEqual(1, len(cumul.keys()))
self.assertEqual(1, len(cur.keys()))
errors_1 = {'Not found': {'count': 10, 'rc': u'404'}}
self.assertEqual(self.convert_kpi_errors(cumul[""]["errors"]), errors_1) # all error data is written
self.assertEqual(self.convert_kpi_errors(cur[""]["errors"]), errors_1) # to 'current' and 'cumulative'
self.assertEqual(self.convert_kpi_errors(cur[""]["errors"]), errors_1)

# frame [1464248744, 1464248745)
res2 = list(obj.datapoints(False))
self.assertEqual(1, len(res2))
cumul = res2[0][DataPoint.CUMULATIVE]
cur = res2[0][DataPoint.CURRENT]
self.assertEqual(1, len(cumul.keys()))
self.assertEqual(1, len(cur.keys()))
self.assertEqual(self.convert_kpi_errors(cumul[""]["errors"]), errors_1) # the same errors,
self.assertEqual(cur[""]["errors"], []) # new errors not found

mock.mock_get.update(self.get_errors_mock({
Expand All @@ -1785,7 +1779,6 @@ def test_get_errors(self):

res3 = list(obj.datapoints(True)) # let's add the last timestamp [1464248745]
self.assertEqual(1, len(res3))
cumul = res3[0][DataPoint.CUMULATIVE]
cur = res3[0][DataPoint.CURRENT]
errors_all_full = {
'Not found': {'count': 11, 'rc': '404'},
Expand All @@ -1797,8 +1790,6 @@ def test_get_errors(self):
'assertion_example': {'count': 33, 'rc': 'All Assertions'}}

errors_label1 = {'Strange behaviour': {'count': 666, 'rc': '666'}}
self.assertEqual(errors_label1, self.convert_kpi_errors(cumul["label1"]["errors"]))
self.assertEqual(errors_all_full, self.convert_kpi_errors(cumul[""]["errors"]))
self.assertEqual(errors_label1, self.convert_kpi_errors(cur["label1"]["errors"]))
self.assertEqual(errors_all_update, self.convert_kpi_errors(cur[""]["errors"]))

Expand Down Expand Up @@ -1884,10 +1875,9 @@ def test_datapoint(self):
obj.master = Master(data={"id": 1})
mock.apply(obj.master)
res = list(obj.datapoints(True))
cumulative_ = res[0][DataPoint.CUMULATIVE]
total = cumulative_['']
percentiles_ = total[KPISet.PERCENTILES]
self.assertEquals(1.05, percentiles_['99.0'])
current = res[0][DataPoint.CURRENT]
percentiles = current[''][KPISet.PERCENTILES]
self.assertEquals(1.05, percentiles['99.0'])

def test_no_kpis_on_cloud_crash(self):
mock = BZMock()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/modules/test_consolidatingAggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_errors_cumulative(self):
self.obj.add_underling(reader)
self.obj.shutdown()
self.obj.post_process()
cum_dict = self.obj.underlings[0].cumulative
cum_dict = self.obj.cumulative
first_err_ids = [id(err) for err in cum_dict['first']['errors']]
second_err_ids = [id(err) for err in cum_dict['second']['errors']]
total_err_ids = [id(err) for err in cum_dict['']['errors']]
Expand Down
6 changes: 4 additions & 2 deletions tests/unit/modules/test_k6.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,7 @@ def test_read(self):

for datapoint in points:
self.assertTrue(datapoint['ts'] > 1500000000)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 2)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 2)
self.assertEqual(points[0][DataPoint.CURRENT][''][KPISet.SUCCESSES], 1)
self.assertEqual(points[1][DataPoint.CURRENT][''][KPISet.SUCCESSES], 1)
self.assertEqual(points[2][DataPoint.CURRENT][''][KPISet.FAILURES], 1)
self.assertEqual(points[3][DataPoint.CURRENT][''][KPISet.FAILURES], 1)
17 changes: 15 additions & 2 deletions tests/unit/modules/test_molotov.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,18 @@ def test_read(self):

for datapoint in points:
self.assertTrue(datapoint['ts'] > 1500000000)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 4)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 4)

current_overalls = [point[DataPoint.CURRENT][''] for point in points]

self.assertEqual(1, current_overalls[0][KPISet.SUCCESSES])
self.assertEqual(0, current_overalls[0][KPISet.FAILURES])

self.assertEqual(1, current_overalls[1][KPISet.SUCCESSES])
self.assertEqual(2, current_overalls[1][KPISet.FAILURES])

self.assertEqual(0, current_overalls[2][KPISet.SUCCESSES])
self.assertEqual(2, current_overalls[2][KPISet.FAILURES])

self.assertEqual(2, current_overalls[3][KPISet.SUCCESSES])
self.assertEqual(0, current_overalls[3][KPISet.FAILURES])

4 changes: 2 additions & 2 deletions tests/unit/modules/test_vegeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,5 +210,5 @@ def test_read(self):

for datapoint in points:
self.assertTrue(datapoint['ts'] > 1500000000)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.SUCCESSES], 3)
self.assertEqual(points[-1][DataPoint.CUMULATIVE][''][KPISet.FAILURES], 1)
self.assertEqual(points[-1][DataPoint.CURRENT][''][KPISet.SUCCESSES], 3)
self.assertEqual(points[-1][DataPoint.CURRENT][''][KPISet.FAILURES], 1)