diff --git a/pystackql/core/output.py b/pystackql/core/output.py index 26387eb..9227c27 100644 --- a/pystackql/core/output.py +++ b/pystackql/core/output.py @@ -96,6 +96,11 @@ def _format_error(self, error_msg): def _format_data(self, data): """Format data. + This method processes SQL type objects from StackQL: + - SQL NULL values: {'String': '', 'Valid': False} → None + - Regular values: {'String': 'value', 'Valid': True} → 'value' + - Empty strings: {'String': '', 'Valid': True} → '' (preserved as empty string) + Args: data (str): The data string @@ -104,19 +109,84 @@ def _format_data(self, data): """ if self.output_format == 'csv': return data - elif self.output_format == 'pandas': - import pandas as pd - try: - return pd.read_json(StringIO(data)) - except ValueError: - return pd.DataFrame([{"error": "Invalid JSON output"}]) - else: # dict - try: - retval = json.loads(data) - return retval if retval else [] - except ValueError: - return [{"error": f"Invalid JSON output : {data}"}] - + + try: + # Attempt to parse JSON first + raw_json_data = json.loads(data) + except json.JSONDecodeError as e: + # Handle specific JSON parsing errors + error_result = [{"error": f"Invalid JSON format: {str(e)}", "position": e.pos, "line": e.lineno, "column": e.colno}] + return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result + except TypeError as e: + # Handle cases where data is not a string or buffer + error_result = [{"error": f"Invalid data type for JSON parsing: {str(e)}", "data_type": str(type(data))}] + return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result + except Exception as e: + # Catch any other unexpected errors + error_result = [{"error": f"Unexpected error parsing JSON: {str(e)}", "exception_type": type(e).__name__}] + return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result + + try: + # Process the JSON data to clean up SQL type objects + processed_json_data = self._process_sql_types(raw_json_data) + + # Handle empty data + if not processed_json_data: + return pd.DataFrame() if self.output_format == 'pandas' else [] + + if self.output_format == 'pandas': + import pandas as pd + # Convert the preprocessed JSON data to a DataFrame + return pd.DataFrame(processed_json_data) + + # Return the preprocessed dictionary data + return processed_json_data + + except Exception as e: + # Handle any errors during processing + error_msg = f"Error processing data: {str(e)}" + if self.output_format == 'pandas': + import pandas as pd + return pd.DataFrame([{"error": error_msg}]) + return [{"error": error_msg}] + + def _process_sql_types(self, data): + """Process SQL type objects in the data. + + Args: + data: The parsed JSON data + + Returns: + The processed data with SQL type objects transformed + """ + # Handle lists (most common case from StackQL) + if isinstance(data, list): + return [self._process_sql_types(item) for item in data] + + # Handle dictionaries (individual records or nested objects) + elif isinstance(data, dict): + # Check if this is an SQL type object + if 'Valid' in data and len(data) <= 2 and ('String' in data or 'Int64' in data or 'Float64' in data): + # This is an SQL type object - transform it + if data.get('Valid', False): + # Valid: True -> return the actual value + for type_key in ['String', 'Int64', 'Float64']: + if type_key in data: + return data.get(type_key) + return None # Fallback if no value field found + else: + # Valid: False -> return None (SQL NULL) + return None + else: + # Regular dictionary - process each value + result = {} + for key, value in data.items(): + result[key] = self._process_sql_types(value) + return result + + # All other data types (strings, numbers, booleans, None) - return as is + return data + def _format_empty(self): """Format an empty result. @@ -154,8 +224,4 @@ def format_statement_result(self, result): elif self.output_format == 'csv': return message else: # dict - # Count number of rows in the message - try: - return {'message': message, 'rowsaffected': message.count('\n')} - except Exception: - return {'message': message, 'rowsaffected': 0} \ No newline at end of file + return {'message': message.rstrip('\n')} \ No newline at end of file diff --git a/pystackql/core/query.py b/pystackql/core/query.py index 5cf2b28..96a0238 100644 --- a/pystackql/core/query.py +++ b/pystackql/core/query.py @@ -151,24 +151,21 @@ def execute(self, query, custom_auth=None, env_vars=None): os.remove(script_path) return output - class AsyncQueryExecutor: - """Executes StackQL queries asynchronously. + """Executes StackQL queries asynchronously in local mode. This class provides methods for executing multiple StackQL queries - concurrently using asyncio. + concurrently using asyncio. Server mode is not supported for async queries. """ - def __init__(self, sync_query_func, server_mode=False, output_format='dict'): + def __init__(self, sync_query_func, output_format='dict'): """Initialize the AsyncQueryExecutor. Args: sync_query_func (callable): Function to execute a single query synchronously - server_mode (bool, optional): Whether to use server mode. Defaults to False. output_format (str, optional): Output format (dict or pandas). Defaults to 'dict'. """ self.sync_query_func = sync_query_func - self.server_mode = server_mode self.output_format = output_format async def execute_queries(self, queries): @@ -188,15 +185,12 @@ async def execute_queries(self, queries): async def main(): with ThreadPoolExecutor() as executor: - # New connection is created for each query in server_mode, reused otherwise - new_connection = self.server_mode - # Create tasks for each query loop = asyncio.get_event_loop() futures = [ loop.run_in_executor( executor, - lambda q=query: self.sync_query_func(q, new_connection), + lambda q=query: self.sync_query_func(q), # Pass query as a default argument to avoid late binding issues ) for query in queries @@ -213,6 +207,7 @@ async def main(): # Process results based on output format if self.output_format == 'pandas': import pandas as pd + # Concatenate the DataFrames return pd.concat(results, ignore_index=True) else: # Flatten the list of results diff --git a/pystackql/core/stackql.py b/pystackql/core/stackql.py index 457528d..db0d483 100644 --- a/pystackql/core/stackql.py +++ b/pystackql/core/stackql.py @@ -152,7 +152,7 @@ def __init__(self, self.debug_log_file = None # Setup output formatter - self.output_formatter = OutputFormatter(output) + self.local_output_formatter = OutputFormatter(output) self.output = output.lower() # Server mode setup @@ -179,62 +179,38 @@ def __init__(self, self.params = setup_local_mode(self, **local_params) # Initialize query executor - self.query_executor = QueryExecutor( + self.local_query_executor = QueryExecutor( self.bin_path, self.params, self.debug, self.debug_log_file ) - # Initialize async query executor - self.async_executor = AsyncQueryExecutor( - self._sync_query_wrapper, - self.server_mode, - self.output - ) + # Initialize async query executor (only for local mode) + if not self.server_mode: + self.async_executor = AsyncQueryExecutor( + self._sync_query_wrapper, + output_format=self.output + ) - def _sync_query_wrapper(self, query, new_connection=False): - """Wrapper for synchronous query execution. + def _sync_query_wrapper(self, query): + """Wrapper for synchronous query execution used by AsyncQueryExecutor. + + This method is exclusively used for local mode async queries. + Server mode is not supported for async queries. Args: query (str): The query to execute - new_connection (bool, optional): Whether to use a new connection. Defaults to False. - + Returns: - The query result + The formatted query result """ - if self.server_mode: - if new_connection: - result = self.server_connection.execute_query_with_new_connection(query) - else: - result = self.server_connection.execute_query(query) - - # Format server result if needed - if self.output == 'pandas': - import pandas as pd - return pd.DataFrame(result) - return result - else: - # Execute query and format result - query_result = self.query_executor.execute(query) - - if "exception" in query_result: - result = [{"error": query_result["exception"]}] - elif "error" in query_result: - result = [{"error": query_result["error"]}] - elif "data" in query_result: - try: - result = json.loads(query_result["data"]) - except Exception: - result = [{"error": f"Invalid JSON output: {query_result['data']}"}] - else: - result = [] - - # Format local result if needed - if self.output == 'pandas': - import pandas as pd - return pd.DataFrame(result) - return result + # Execute query + query_result = self.local_query_executor.execute(query) + + # Format the result using the OutputFormatter + # This will handle SQL type objects through the _format_data method + return self.local_output_formatter.format_query_result(query_result) def properties(self): """Retrieves the properties of the StackQL instance. @@ -257,7 +233,7 @@ def properties(self): props = {} for var in vars(self): # Skip internal objects - if var.startswith('_') or var in ['output_formatter', 'query_executor', 'async_executor', 'binary_manager', 'server_connection']: + if var.startswith('_') or var in ['local_output_formatter', 'local_query_executor', 'async_executor', 'binary_manager', 'server_connection']: continue props[var] = getattr(self, var) return props @@ -329,10 +305,10 @@ def executeStmt(self, query, custom_auth=None, env_vars=None): return result else: # Execute the query - result = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars) + result = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars) # Format the result - return self.output_formatter.format_statement_result(result) + return self.local_output_formatter.format_statement_result(result) def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None): """ @@ -387,11 +363,53 @@ def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None): suppress_errors = False # Execute the query - output = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars) + output = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars) # Format the result - return self.output_formatter.format_query_result(output, suppress_errors) - + return self.local_output_formatter.format_query_result(output, suppress_errors) + + # async def executeQueriesAsync(self, queries): + # """Executes multiple StackQL queries asynchronously using the current StackQL instance. + + # This method utilizes an asyncio event loop to concurrently run a list of provided + # StackQL queries. Each query is executed independently, and the combined results of + # all the queries are returned as a list of JSON objects if 'dict' output mode is selected, + # or as a concatenated DataFrame if 'pandas' output mode is selected. + + # The order of the results in the returned list or DataFrame may not necessarily + # correspond to the order of the queries in the input list due to the asynchronous nature + # of execution. + + # :param queries: A list of StackQL query strings to be executed concurrently. + # :type queries: list[str], required + # :return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame. + # :rtype: list[dict] or pd.DataFrame + # :raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux). + # :raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas'). + + # Example: + # >>> from pystackql import StackQL + # >>> stackql = StackQL() + # >>> queries = [ + # >>> \"\"\"SELECT '%s' as region, instanceType, COUNT(*) as num_instances + # ... FROM aws.ec2.instances + # ... WHERE region = '%s' + # ... GROUP BY instanceType\"\"\" % (region, region) + # >>> for region in regions ] + # >>> result = stackql.executeQueriesAsync(queries) + + # Note: + # - When operating in `server_mode`, this method is not supported. + # """ + # if self.server_mode: + # raise ValueError( + # "The executeQueriesAsync method is not supported in server mode. " + # "Please use the standard execute method with individual queries instead, " + # "or switch to local mode if you need to run multiple queries concurrently." + # ) + + # return await self.async_executor.execute_queries(queries) + async def executeQueriesAsync(self, queries): """Executes multiple StackQL queries asynchronously using the current StackQL instance. @@ -408,7 +426,7 @@ async def executeQueriesAsync(self, queries): :type queries: list[str], required :return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame. :rtype: list[dict] or pd.DataFrame - :raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux). + :raises ValueError: If server_mode is True (async is only supported in local mode). :raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas'). Example: @@ -423,7 +441,7 @@ async def executeQueriesAsync(self, queries): >>> result = stackql.executeQueriesAsync(queries) Note: - - When operating in `server_mode`, this method is not supported. + - This method is only supported in local mode. """ if self.server_mode: raise ValueError( @@ -432,8 +450,12 @@ async def executeQueriesAsync(self, queries): "or switch to local mode if you need to run multiple queries concurrently." ) + # Verify that async_executor is available (should only be initialized in local mode) + if not hasattr(self, 'async_executor'): + raise RuntimeError("Async executor not initialized. This should not happen.") + return await self.async_executor.execute_queries(queries) - + def test_connection(self): """Tests if the server connection is working by executing a simple query. diff --git a/pystackql/magic_ext/base.py b/pystackql/magic_ext/base.py index 0febcc3..45e2f3c 100644 --- a/pystackql/magic_ext/base.py +++ b/pystackql/magic_ext/base.py @@ -76,17 +76,36 @@ def _display_with_csv_download(self, df): # Display the DataFrame first IPython.display.display(df) + # # Create and display the download button + # download_html = f''' + #