This data pipeline processes sales data, user data, and weather data to generate actionable insights and visualizations. The pipeline is designed to be executed through the main.py script, which orchestrates the entire flow from data extraction to analysis.
I utilized advanced data engineering techniques, integrating multiple data sources like sales, user, and weather data to build a seamless pipeline. The pipeline automates data validation, transformation, and analysis, ensuring accurate insights and efficient processing.
This project showcases how combining data pipeline automation with real-time data processing can drive actionable insights and improve decision-making.
This data pipeline processes sales data, user data, and weather data to generate insights and visualizations. It is designed to be run using the main.py script, which orchestrates the entire pipeline.
Note 1 - Create a table "sales" in mysql and load "AIQ - Data Engineer Assignment - Sales data.csv" using Import wizard.
The main script that triggers the data pipeline. It performs the following steps:
Connects to the MySQL database. Loads sales data from the database. Validates the schema of the sales data. Fetches user data from an API. Validates the schema of the user data. Merges user data with sales data. Merges user and weather data based on location. Outputs the final merged data to a CSV file. After the data processing, it initializes a DataAnalyzer to perform various analyses on the final data.
a. mymodel.py Contains the MyModel class, responsible for interacting with the data sources, performing data operations, and managing the data pipeline.
b. analysis.py Includes the DataAnalyzer class, which performs various analyses on the final merged data, such as calculating total sales amount per customer, average order quantity per product, and generating visualizations.
Contains utility functions for loading data. dataloader.py includes functions for making a connection to the MySQL database, loading sales data, and fetching user data from an API.
Stores configuration files for the data pipeline, including config.py for general configurations and logging_config.yaml for logging settings.
Includes SQL queries for creating and loading data into the MySQL database.
Contains exception handling classes. exceptions.py defines custom exceptions for better error handling.
Contains utility functions and configurations, such as config.py for configuration settings and logger.py for logging setup.
Includes unit tests for the data pipeline components.
Stores the output of the data analysis, including CSV files and visualizations.
Install Dependencies:
Make sure you have Python 3.9 installed.
Install the required dependencies using:
bash Copy code
git clone https://github.com/himalayaashish/Data_Engineer_Project.gitcd Data_Engineer_Projectpip install -r requirements.txtDatabase Configuration:
Configure the MySQL database connection details in configs/config.py. API Configuration:
Update the API endpoint in main.py with the desired endpoint for fetching user data. Run the Data Pipeline:
Execute the following command in the terminal:
bash Copy code
python main.pyView Results:
The final merged data will be stored in final_data.csv. Analysis results and visualizations will be saved in the AnalysisOutputResults directory. Unit Testing Run the unit tests using:
pytest
pandas
pymysql
jsonschema
pytest (for testing)1- Any third party software / tool like Tableau or power BI is good for data visualization.
2- Data analysis can be done using plain sql query. We can create stored procedures and analysis.py can be further enhanced just to call these stored procedures when ever is required.
3- Data drift has been mostly handled with SCHEMA checks but needs further modification to validate the range of each fields.


