Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/src/logzen/db/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class Stream(Entity):
default='')

filter = sqlalchemy.Column(JSONDict,
nullable=False)
nullable=True,
default=None)


@export()
Expand Down
19 changes: 16 additions & 3 deletions backend/src/logzen/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def ElasticsearchConfigDecl(config_decl):
section_decl('index',
default='syslog')

# The doc_type of the the syslog messages
section_decl('type',
default='event')



@export()
Expand All @@ -64,10 +68,18 @@ def __init__(self):
else:
auth = None

self.__connection = Elasticsearch(self.config.es.host,
self.__connection = Elasticsearch(self.config.es.hosts,
connection_class=Urllib3HttpConnection,
http_auth=auth)

if not self.__connection.indices.exists(self.config.es.index):
self.logger.warn('Specified index does not exist: %s', self.config.es.index)

elif not self.__connection.indices.exists_type(self.config.es.index,
self.config.es.type):
self.logger.warn('Specified mapping does not exist: %s/%s', self.config.es.index, self.config.es.type)


def search(self,
body):
""" Execute a search query.
Expand All @@ -79,5 +91,6 @@ def search(self,
self.logger.debug('Execute search: %s',
JSONSerializer().dumps(body))

return self.__connection.search(body=body,
index=self.config.es.index)
return self.__connection.search(self.config.es.index,
self.config.es.type,
body)
113 changes: 71 additions & 42 deletions backend/src/logzen/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,54 +29,83 @@ class Logs(object):

es = require('logzen.es:Connection')

users = require('logzen.db.users:Users')
streams = require('logzen.db.streams:Streams')

def queryWithFilter(self,
filter,
query=None):
""" Search for logs using the passed filter and an optional query.
"""

def query(self,
stream,
query=None):
""" Search for logs using the passed stream and query.
request = {}

The returned log list is filtered by the filter assigned to the
user owning the stream and by an optional filter assigned to a stream.
# Add the filter to the request - if any
if filter:
request['filter'] = filter

The returned value is the unmodified result of the executed
ElasticSearch query.
"""
# Add the query to the request - if any
if query:
request['query'] = query

else:
request['query'] = self.MATCH_ALL

# Execute the request
result = self.es.search(request)

request = {
}
return result


def queryWithFilters(self,
filters,
query=None):
""" Search for logs using the passed filters and an optional query.

If the filters does contain more than one filter, the filters are concatinated using the 'and' operation.
"""

# Add the filters to the request
if stream.user.filter and stream.filter:
# Create a combined filter using the user filter and stream filter
request.update({
'filter': {
'and': [
stream.user.filter,
stream.filter
]
}
})

elif stream.user.filter:
# Use only the user filter
request.update({
'filter': stream.user.filter
})

elif stream.filter is not None:
# Use only the stream filter
request.update({
'filter': stream.filter
})
if not filters:
# Do not use a filter
filter = None

# Add the query to the request - if any
if query:
request.update({
'query': query,
})
elif len(filters) == 1:
# Use the single filter as-is
filter = filters[0]

else:
# Concatenate all filters using 'and' operation
filter = {'and': filters}

# Execute the query
return self.queryWithFilter(filter,
query)


def queryWithUser(self,
user,
query=None):
""" Search for logs using the passed users filter and an optional query.
"""

return self.queryWithFilter(user.filter,
query)


def queryWithStream(self,
stream,
query=None):
""" Search for logs using the passed streams filter and an optional query.
"""

filters = []

# Add the user filter to the list of filters - if any
if stream.user.filter:
filters.append(stream.user.filter)

# Add the stream filter to the list of filters - if any
if stream.filter:
filters.append(stream.filter)

# Execute the search
return self.es.search(request)
return self.queryWithFilters(filters,
query)
24 changes: 17 additions & 7 deletions backend/src/logzen/web/api/user/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,31 @@



@resource('/logs/*', ['GET', 'POST'])
@require(user='logzen.web.api.auth:User',
request='logzen.web.api:Request',
logs='logzen.logs:Logs')
def query(user,
request,
logs):
return logs.queryWithUser(user=user,
query=request.json)



@resource('/logs/<stream>', ['GET', 'POST'])
@require(user='logzen.web.api.auth:User',
request='logzen.web.api:Request',
logs='logzen.logs:Logs')
def query(name,
def query(stream,
user,
request,
logs):
# Resolve the stream entity
try:
stream = user.streams[name]
stream = user.streams[stream]

except KeyError:
raise bottle.HTTPError(404, 'Stream not found: %s' % name)
raise bottle.HTTPError(404, 'Stream not found: %s' % stream)

# Execute the query and return the result
return logs.query(stream=stream,
query=request.json)
return logs.queryWithStream(stream=stream,
query=request.json)
Loading