Skip to content

output formatting #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 84 additions & 18 deletions pystackql/core/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def _format_error(self, error_msg):
def _format_data(self, data):
"""Format data.

This method processes SQL type objects from StackQL:
- SQL NULL values: {'String': '', 'Valid': False} → None
- Regular values: {'String': 'value', 'Valid': True} → 'value'
- Empty strings: {'String': '', 'Valid': True} → '' (preserved as empty string)

Args:
data (str): The data string

Expand All @@ -104,19 +109,84 @@ def _format_data(self, data):
"""
if self.output_format == 'csv':
return data
elif self.output_format == 'pandas':
import pandas as pd
try:
return pd.read_json(StringIO(data))
except ValueError:
return pd.DataFrame([{"error": "Invalid JSON output"}])
else: # dict
try:
retval = json.loads(data)
return retval if retval else []
except ValueError:
return [{"error": f"Invalid JSON output : {data}"}]


try:
# Attempt to parse JSON first
raw_json_data = json.loads(data)
except json.JSONDecodeError as e:
# Handle specific JSON parsing errors
error_result = [{"error": f"Invalid JSON format: {str(e)}", "position": e.pos, "line": e.lineno, "column": e.colno}]
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result
except TypeError as e:
# Handle cases where data is not a string or buffer
error_result = [{"error": f"Invalid data type for JSON parsing: {str(e)}", "data_type": str(type(data))}]
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result
except Exception as e:
# Catch any other unexpected errors
error_result = [{"error": f"Unexpected error parsing JSON: {str(e)}", "exception_type": type(e).__name__}]
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result

try:
# Process the JSON data to clean up SQL type objects
processed_json_data = self._process_sql_types(raw_json_data)

# Handle empty data
if not processed_json_data:
return pd.DataFrame() if self.output_format == 'pandas' else []

if self.output_format == 'pandas':
import pandas as pd
# Convert the preprocessed JSON data to a DataFrame
return pd.DataFrame(processed_json_data)

# Return the preprocessed dictionary data
return processed_json_data

except Exception as e:
# Handle any errors during processing
error_msg = f"Error processing data: {str(e)}"
if self.output_format == 'pandas':
import pandas as pd
return pd.DataFrame([{"error": error_msg}])
return [{"error": error_msg}]

def _process_sql_types(self, data):
"""Process SQL type objects in the data.

Args:
data: The parsed JSON data

Returns:
The processed data with SQL type objects transformed
"""
# Handle lists (most common case from StackQL)
if isinstance(data, list):
return [self._process_sql_types(item) for item in data]

# Handle dictionaries (individual records or nested objects)
elif isinstance(data, dict):
# Check if this is an SQL type object
if 'Valid' in data and len(data) <= 2 and ('String' in data or 'Int64' in data or 'Float64' in data):
# This is an SQL type object - transform it
if data.get('Valid', False):
# Valid: True -> return the actual value
for type_key in ['String', 'Int64', 'Float64']:
if type_key in data:
return data.get(type_key)
return None # Fallback if no value field found
else:
# Valid: False -> return None (SQL NULL)
return None
else:
# Regular dictionary - process each value
result = {}
for key, value in data.items():
result[key] = self._process_sql_types(value)
return result

# All other data types (strings, numbers, booleans, None) - return as is
return data

def _format_empty(self):
"""Format an empty result.

Expand Down Expand Up @@ -154,8 +224,4 @@ def format_statement_result(self, result):
elif self.output_format == 'csv':
return message
else: # dict
# Count number of rows in the message
try:
return {'message': message, 'rowsaffected': message.count('\n')}
except Exception:
return {'message': message, 'rowsaffected': 0}
return {'message': message.rstrip('\n')}
15 changes: 5 additions & 10 deletions pystackql/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,21 @@ def execute(self, query, custom_auth=None, env_vars=None):
os.remove(script_path)
return output


class AsyncQueryExecutor:
"""Executes StackQL queries asynchronously.
"""Executes StackQL queries asynchronously in local mode.

This class provides methods for executing multiple StackQL queries
concurrently using asyncio.
concurrently using asyncio. Server mode is not supported for async queries.
"""

def __init__(self, sync_query_func, server_mode=False, output_format='dict'):
def __init__(self, sync_query_func, output_format='dict'):
"""Initialize the AsyncQueryExecutor.

Args:
sync_query_func (callable): Function to execute a single query synchronously
server_mode (bool, optional): Whether to use server mode. Defaults to False.
output_format (str, optional): Output format (dict or pandas). Defaults to 'dict'.
"""
self.sync_query_func = sync_query_func
self.server_mode = server_mode
self.output_format = output_format

async def execute_queries(self, queries):
Expand All @@ -188,15 +185,12 @@ async def execute_queries(self, queries):

async def main():
with ThreadPoolExecutor() as executor:
# New connection is created for each query in server_mode, reused otherwise
new_connection = self.server_mode

# Create tasks for each query
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
lambda q=query: self.sync_query_func(q, new_connection),
lambda q=query: self.sync_query_func(q),
# Pass query as a default argument to avoid late binding issues
)
for query in queries
Expand All @@ -213,6 +207,7 @@ async def main():
# Process results based on output format
if self.output_format == 'pandas':
import pandas as pd
# Concatenate the DataFrames
return pd.concat(results, ignore_index=True)
else:
# Flatten the list of results
Expand Down
130 changes: 76 additions & 54 deletions pystackql/core/stackql.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(self,
self.debug_log_file = None

# Setup output formatter
self.output_formatter = OutputFormatter(output)
self.local_output_formatter = OutputFormatter(output)
self.output = output.lower()

# Server mode setup
Expand All @@ -179,62 +179,38 @@ def __init__(self,
self.params = setup_local_mode(self, **local_params)

# Initialize query executor
self.query_executor = QueryExecutor(
self.local_query_executor = QueryExecutor(
self.bin_path,
self.params,
self.debug,
self.debug_log_file
)

# Initialize async query executor
self.async_executor = AsyncQueryExecutor(
self._sync_query_wrapper,
self.server_mode,
self.output
)
# Initialize async query executor (only for local mode)
if not self.server_mode:
self.async_executor = AsyncQueryExecutor(
self._sync_query_wrapper,
output_format=self.output
)

def _sync_query_wrapper(self, query, new_connection=False):
"""Wrapper for synchronous query execution.
def _sync_query_wrapper(self, query):
"""Wrapper for synchronous query execution used by AsyncQueryExecutor.

This method is exclusively used for local mode async queries.
Server mode is not supported for async queries.

Args:
query (str): The query to execute
new_connection (bool, optional): Whether to use a new connection. Defaults to False.


Returns:
The query result
The formatted query result
"""
if self.server_mode:
if new_connection:
result = self.server_connection.execute_query_with_new_connection(query)
else:
result = self.server_connection.execute_query(query)

# Format server result if needed
if self.output == 'pandas':
import pandas as pd
return pd.DataFrame(result)
return result
else:
# Execute query and format result
query_result = self.query_executor.execute(query)

if "exception" in query_result:
result = [{"error": query_result["exception"]}]
elif "error" in query_result:
result = [{"error": query_result["error"]}]
elif "data" in query_result:
try:
result = json.loads(query_result["data"])
except Exception:
result = [{"error": f"Invalid JSON output: {query_result['data']}"}]
else:
result = []

# Format local result if needed
if self.output == 'pandas':
import pandas as pd
return pd.DataFrame(result)
return result
# Execute query
query_result = self.local_query_executor.execute(query)

# Format the result using the OutputFormatter
# This will handle SQL type objects through the _format_data method
return self.local_output_formatter.format_query_result(query_result)

def properties(self):
"""Retrieves the properties of the StackQL instance.
Expand All @@ -257,7 +233,7 @@ def properties(self):
props = {}
for var in vars(self):
# Skip internal objects
if var.startswith('_') or var in ['output_formatter', 'query_executor', 'async_executor', 'binary_manager', 'server_connection']:
if var.startswith('_') or var in ['local_output_formatter', 'local_query_executor', 'async_executor', 'binary_manager', 'server_connection']:
continue
props[var] = getattr(self, var)
return props
Expand Down Expand Up @@ -329,10 +305,10 @@ def executeStmt(self, query, custom_auth=None, env_vars=None):
return result
else:
# Execute the query
result = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
result = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)

# Format the result
return self.output_formatter.format_statement_result(result)
return self.local_output_formatter.format_statement_result(result)

def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None):
"""
Expand Down Expand Up @@ -387,11 +363,53 @@ def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None):
suppress_errors = False

# Execute the query
output = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
output = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)

# Format the result
return self.output_formatter.format_query_result(output, suppress_errors)

return self.local_output_formatter.format_query_result(output, suppress_errors)

# async def executeQueriesAsync(self, queries):
# """Executes multiple StackQL queries asynchronously using the current StackQL instance.

# This method utilizes an asyncio event loop to concurrently run a list of provided
# StackQL queries. Each query is executed independently, and the combined results of
# all the queries are returned as a list of JSON objects if 'dict' output mode is selected,
# or as a concatenated DataFrame if 'pandas' output mode is selected.

# The order of the results in the returned list or DataFrame may not necessarily
# correspond to the order of the queries in the input list due to the asynchronous nature
# of execution.

# :param queries: A list of StackQL query strings to be executed concurrently.
# :type queries: list[str], required
# :return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
# :rtype: list[dict] or pd.DataFrame
# :raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux).
# :raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').

# Example:
# >>> from pystackql import StackQL
# >>> stackql = StackQL()
# >>> queries = [
# >>> \"\"\"SELECT '%s' as region, instanceType, COUNT(*) as num_instances
# ... FROM aws.ec2.instances
# ... WHERE region = '%s'
# ... GROUP BY instanceType\"\"\" % (region, region)
# >>> for region in regions ]
# >>> result = stackql.executeQueriesAsync(queries)

# Note:
# - When operating in `server_mode`, this method is not supported.
# """
# if self.server_mode:
# raise ValueError(
# "The executeQueriesAsync method is not supported in server mode. "
# "Please use the standard execute method with individual queries instead, "
# "or switch to local mode if you need to run multiple queries concurrently."
# )

# return await self.async_executor.execute_queries(queries)

async def executeQueriesAsync(self, queries):
"""Executes multiple StackQL queries asynchronously using the current StackQL instance.

Expand All @@ -408,7 +426,7 @@ async def executeQueriesAsync(self, queries):
:type queries: list[str], required
:return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
:rtype: list[dict] or pd.DataFrame
:raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux).
:raises ValueError: If server_mode is True (async is only supported in local mode).
:raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').

Example:
Expand All @@ -423,7 +441,7 @@ async def executeQueriesAsync(self, queries):
>>> result = stackql.executeQueriesAsync(queries)

Note:
- When operating in `server_mode`, this method is not supported.
- This method is only supported in local mode.
"""
if self.server_mode:
raise ValueError(
Expand All @@ -432,8 +450,12 @@ async def executeQueriesAsync(self, queries):
"or switch to local mode if you need to run multiple queries concurrently."
)

# Verify that async_executor is available (should only be initialized in local mode)
if not hasattr(self, 'async_executor'):
raise RuntimeError("Async executor not initialized. This should not happen.")

return await self.async_executor.execute_queries(queries)

def test_connection(self):
"""Tests if the server connection is working by executing a simple query.

Expand Down
Loading