Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,4 @@ venv.bak/

# Misc backups
*.bak
report-api/src/sbc-common-components
111 changes: 81 additions & 30 deletions report-api/src/api/resources/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Endpoints to check and manage payments."""
import gzip
import json
from http import HTTPStatus

from flask import Response, abort, request
from flask import Response, abort, request, stream_with_context
from flask_restx import Namespace, Resource
from jinja2 import TemplateNotFound

Expand All @@ -25,6 +27,80 @@
API = Namespace('Reports', description='Service - Reports')


def _parse_request_json():
"""Parse request JSON, handling GZIP decompression if needed."""
content_encoding = request.headers.get('Content-Encoding', '').lower()
if content_encoding == 'gzip':
try:
compressed_data = request.get_data()
decompressed_data = gzip.decompress(compressed_data)
return json.loads(decompressed_data.decode('utf-8'))
except (gzip.BadGzipFile, json.JSONDecodeError, UnicodeDecodeError) as e:
abort(HTTPStatus.BAD_REQUEST, f'Failed to decompress or parse GZIP data: {str(e)}')
return request.get_json()


def _generate_csv_report(request_json):
"""Generate CSV report from request data."""
report_name = request_json.get('reportName', 'report')
file_name = f'{report_name}.csv'
template_vars = request_json.get('templateVars', {})
if not template_vars.get('columns'):
return None, file_name
report = CsvService.create_report(template_vars)
return report, file_name


def _generate_pdf_report(request_json):
"""Generate PDF report from request data."""
report_name = request_json.get('reportName', 'report')
file_name = f'{report_name}.pdf'
template_vars = request_json['templateVars']
populate_page_number = bool(request_json.get('populatePageNumber', None))

if 'templateName' in request_json:
template_name = request_json['templateName']
try:
report = ReportService.create_report_from_stored_template(
template_name, template_vars, populate_page_number
)
except TemplateNotFound:
abort(HTTPStatus.NOT_FOUND, 'Template not found')
except ValueError as e:
abort(HTTPStatus.BAD_REQUEST, str(e))
elif 'template' in request_json:
report = ReportService.create_report_from_template(
request_json['template'], template_vars, populate_page_number
)
else:
report = None

return report, file_name


def _create_response(report, file_name, content_type):
"""Create streaming HTTP response with report data."""
if report is None:
abort(HTTPStatus.BAD_REQUEST, 'Report cannot be generated')

content_disposition = f'attachment; filename="{file_name}"' # noqa: E702

if content_type == 'text/csv':
response_data = stream_with_context(report)
else:
def pdf_generator():
yield report
response_data = stream_with_context(pdf_generator())

return Response(
response_data,
mimetype=content_type,
headers={
'Content-Disposition': content_disposition
}
)


@API.route('')
class Report(Resource):
"""Payment endpoint resource."""
Expand All @@ -38,35 +114,10 @@ def get():
@_jwt.requires_auth
def post():
"""Create a report."""
report = None
request_json = request.get_json()
request_json = _parse_request_json()
response_content_type = request.headers.get('Accept', 'application/pdf')
if response_content_type == 'text/csv':
file_name = f"{request_json.get('reportName')}.csv"
report = CsvService.create_report(request_json.get('templateVars'))
else:
file_name = f"{request_json.get('reportName')}.pdf"
template_vars = request_json['templateVars']
populate_page_number = bool(request_json.get('populatePageNumber', None))

if 'templateName' in request_json: # Ignore template if template_name is present
template_name = request_json['templateName']
try:
report = ReportService.create_report_from_stored_template(template_name, template_vars,
populate_page_number)
except TemplateNotFound:
abort(HTTPStatus.NOT_FOUND, 'Template not found')

elif 'template' in request_json:
report = ReportService.create_report_from_template(request_json['template'], template_vars,
populate_page_number)

if report is not None:
response = Response(report, 200)
response.headers.set('Content-Disposition', 'attachment', filename=file_name)
response.headers.set('Content-Type', response_content_type)

report, file_name = _generate_csv_report(request_json)
else:
abort(HTTPStatus.BAD_REQUEST, 'Report cannot be generated')

return response
report, file_name = _generate_pdf_report(request_json)
return _create_response(report, file_name, response_content_type)
2 changes: 2 additions & 0 deletions report-api/src/api/resources/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ def get():
response.headers.set('Content-Type', 'application/html')
except TemplateNotFound:
abort(HTTPStatus.NOT_FOUND, 'Template not found')
except ValueError as e:
abort(HTTPStatus.BAD_REQUEST, str(e))
return response
5 changes: 3 additions & 2 deletions report-api/src/api/services/chunk_report_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from api.services.footer_service import add_page_numbers_to_pdf
from api.services.gotenberg_service import GotenbergService
from api.utils.util import TEMPLATE_FOLDER_PATH
from api.utils.util import TEMPLATE_FOLDER_PATH, sanitize_template_name


class ChunkReportService: # pylint:disable=too-few-public-methods
Expand Down Expand Up @@ -86,8 +86,9 @@ def _build_chunk_html(
chunk_vars['groupedInvoices'] = [invoice_copy]
chunk_vars['_chunk_info'] = asdict(chunk_info)

sanitized_name = sanitize_template_name(template_name)
template = ChunkReportService._TEMPLATE_ENV.get_template(
f'{TEMPLATE_FOLDER_PATH}/{template_name}.html'
f'{TEMPLATE_FOLDER_PATH}/{sanitized_name}.html'
)
bc_logo_url = url_for('static', filename='images/bcgov-logo-vert.jpg')
registries_url = url_for('static', filename='images/reg_logo.png')
Expand Down
34 changes: 20 additions & 14 deletions report-api/src/api/services/csv_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,31 @@
"""Service to manage report-templates."""

import csv
from tempfile import NamedTemporaryFile
from typing import Dict
import io
from typing import Dict, Iterator


class CsvService: # pylint: disable=too-few-public-methods
"""Service for all template related operations."""

@classmethod
def create_report(cls, payload: Dict):
"""Create a report csv report from the input parameters."""
temp_file = None
def create_report(cls, payload: Dict) -> Iterator[bytes]:
"""Create a streaming CSV report generator from the input parameters."""
columns = payload.get('columns', None)
values = payload.get('values', None)
if columns:
temp_file = NamedTemporaryFile(delete=True) # pylint: disable=consider-using-with
with open(temp_file.name, 'w', newline='', encoding='utf-8') as csvfile:
report = csv.writer(csvfile)
report.writerow(columns)
for row in values:
report.writerow(row)

return temp_file
if not columns:
return

buffer = io.StringIO()
writer = csv.writer(buffer)

writer.writerow(columns)
yield buffer.getvalue().encode('utf-8')
buffer.seek(0)
buffer.truncate(0)

for row in values:
writer.writerow(row)
yield buffer.getvalue().encode('utf-8')
buffer.seek(0)
buffer.truncate(0)
5 changes: 3 additions & 2 deletions report-api/src/api/services/report_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from api.services.footer_service import add_page_numbers_to_pdf
from api.services.gotenberg_service import GotenbergService
from api.services.page_info import populate_page_count, populate_page_info
from api.utils.util import TEMPLATE_FOLDER_PATH
from api.utils.util import TEMPLATE_FOLDER_PATH, sanitize_template_name


def format_datetime(value, format='short'): # pylint: disable=redefined-builtin
Expand Down Expand Up @@ -98,7 +98,8 @@ def create_report_from_stored_template(
generate_page_number: bool = False,
):
"""Create a report from a stored template."""
template = ENV.get_template(f'{TEMPLATE_FOLDER_PATH}/{template_name}.html')
sanitized_name = sanitize_template_name(template_name)
template = ENV.get_template(f'{TEMPLATE_FOLDER_PATH}/{sanitized_name}.html')
bc_logo_url = url_for('static', filename='images/bcgov-logo-vert.jpg')
registries_url = url_for('static', filename='images/reg_logo.png')
html_out = template.render(
Expand Down
6 changes: 3 additions & 3 deletions report-api/src/api/services/template_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

import fnmatch
import os
import os.path

from jinja2 import Environment, FileSystemLoader

from api.utils.util import TEMPLATE_FOLDER_PATH
from api.utils.util import TEMPLATE_FOLDER_PATH, sanitize_template_name


ENV = Environment(loader=FileSystemLoader('.'))
Expand All @@ -42,6 +41,7 @@ def find_all_templates():
@classmethod
def get_stored_template(cls, templatename: str, ):
"""Get a stored template."""
template = ENV.get_template(f'{TEMPLATE_FOLDER_PATH}/{templatename}.html')
sanitized_name = sanitize_template_name(templatename)
template = ENV.get_template(f'{TEMPLATE_FOLDER_PATH}/{sanitized_name}.html')
html_template = template.render()
return html_template
23 changes: 22 additions & 1 deletion report-api/src/api/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,32 @@

A simple decorator to add the options method to a Request Class.
"""
# from functools import wraps
import os.path
import re

TEMPLATE_FOLDER_PATH = 'report-templates/'


def sanitize_template_name(template_name: str) -> str:
"""Sanitize template name to prevent path traversal attacks."""
if not template_name:
raise ValueError('Template name cannot be empty')

sanitized = re.sub(r'[^a-zA-Z0-9_-]', '', template_name)

if not sanitized:
raise ValueError('Template name contains no valid characters')

if '..' in template_name or '/' in template_name or '\\' in template_name:
raise ValueError('Template name contains invalid path characters')

final_path = os.path.join(TEMPLATE_FOLDER_PATH, f'{sanitized}.html')
if not os.path.abspath(final_path).startswith(os.path.abspath(TEMPLATE_FOLDER_PATH)):
raise ValueError('Template path traversal detected')

return sanitized


def cors_preflight(methods: str = 'GET'):
"""Render an option method on the class."""
def wrapper(f):
Expand Down
Loading