diff --git a/.version b/.version new file mode 100644 index 0000000..d1c6331 --- /dev/null +++ b/.version @@ -0,0 +1 @@ +0.01 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e93f9be --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.10.2 + +ENV PYTHONUNBUFFERED 1 + +WORKDIR /adi + +ADD requirements.txt /adi/ +RUN pip install --upgrade pip +RUN pip install -r requirements.txt + +# ① Install some dependencies +RUN apt-get update \ + && apt-get install -y libsasl2-dev python-dev libldap2-dev libssl-dev \ + && rm -rf /var/lib/apt/lists/* \ + && apt-get clean + + +# Copy adi +ADD /adi /adi + + +VOLUME /usr/src \ No newline at end of file diff --git a/Jenkinsfile_Devstart b/Jenkinsfile_Devstart new file mode 100644 index 0000000..be2d3be --- /dev/null +++ b/Jenkinsfile_Devstart @@ -0,0 +1,36 @@ +pipeline { + + environment { + DOCKERHUB_CREDENTIALS = credentials('avicoiot-dockerhub') + PRODUCT = 'ADI' + GIT_HOST = 'somewhere' + GIT_REPO = 'repo' + } + agent any + + options { + buildDiscarder(logRotator(numToKeepStr: '3')) + } + stages { + stage('Build') { + steps { + sh 'docker build -t avicoiot/adi-alpine:latest .' + } + } + stage('Login') { + steps { + sh 'echo $DOCKERHUB_CREDENTIALS_PSW | docker login --username $DOCKERHUB_CREDENTIALS_USR --password-stdin' + } + } + stage('Push') { + steps { + sh 'docker push avicoiot/adi-alpine:latest' + } + } + } + post { + always { + sh 'docker logout' + } + } +} diff --git a/Jenkinsfile_start b/Jenkinsfile_start deleted file mode 100644 index c965622..0000000 --- a/Jenkinsfile_start +++ /dev/null @@ -1,36 +0,0 @@ -pipeline { - agent { - node { - label 'python' - } - } - triggers { - pollSCM '*/5 * * * *' - } - stages { - stage('Build') { - steps { - echo "Building.." - sh ''' - echo "doing build stuff.." - ''' - } - } - stage('Test') { - steps { - echo "Testing.." - sh ''' - echo "doing test stuff.." - ''' - } - } - stage('Deliver') { - steps { - echo 'Deliver....' - sh ''' - echo "doing delivery stuff.." - ''' - } - } - } -} diff --git a/RN.md b/RN.md new file mode 100644 index 0000000..e69de29 diff --git a/Untitled Diagram.drawio b/Untitled Diagram.drawio new file mode 100644 index 0000000..4b20a0e --- /dev/null +++ b/Untitled Diagram.drawio @@ -0,0 +1 @@ +UzV2zq1wL0osyPDNT0nNUTV2VTV2LsrPL4GwciucU3NyVI0MMlNUjV1UjYwMgFjVyA2HrCFY1qAgsSg1rwSLBiADYTaQg2Y1AA== \ No newline at end of file diff --git a/adi/app_config/config.yaml b/adi/app_config/config.yaml new file mode 100644 index 0000000..b0d4259 --- /dev/null +++ b/adi/app_config/config.yaml @@ -0,0 +1,77 @@ +# backend/webserver/config/config.yml +PROJECT_NAME: 'ADI' + +application_conig: + db_archive: "/db_archive" + + rules: + folder: "/csv_files" + files: ['source1.csv' ] + customers_list: [1,2,3,4,5,6,7] + + + +databases: + + mongo: + DB_TYPE: 'mongodb' + ENGINE: 'mongodb' + DRIVER: 'motor' + NAME: 'webserver' + USER: 'admin' + PASSWORD: 'admin' + HOST: 'mongo_db' + PORT: 27017 + DROP_COLLECTION_ON_START: ["sdad"] + DB_PREPARATION: + security: + index: + username + email + customer: + index: + customer_no + email + + WATCH: ["customer","test"] + + internal: + DB_TYPE: 'postgres' + ENGINE: 'postgres' + NAME: 'internal' + USER: 'admin' + PASSWORD: 'admin' + HOST: '192.168.1.113' + PORT: 5432 + + postgres: + DB_TYPE: 'postgres' + ENGINE: 'postgres' + NAME: 'dvdrental' + USER: 'admin' + PASSWORD: 'admin' + HOST: '192.168.1.113' + PORT: 5432 + + target: + DB_TYPE: 'postgres' + ENGINE: 'postgres' + NAME: 'target' + USER: 'admin' + PASSWORD: 'admin' + HOST: '192.168.1.113' + PORT: 5432 + + redis: + host: redis_db + port: 6379 + db: 0 + + +files: + default: + input_file_path: '/webserver/input/' + output_file_path: '/webserver/output/' + +security: + trace_request: 'Y' \ No newline at end of file diff --git a/app_config/customer.py b/adi/app_config/customer.py similarity index 100% rename from app_config/customer.py rename to adi/app_config/customer.py diff --git a/app_config/db_config.py b/adi/app_config/db_config.py similarity index 85% rename from app_config/db_config.py rename to adi/app_config/db_config.py index 9ddaa38..1644e1f 100644 --- a/app_config/db_config.py +++ b/adi/app_config/db_config.py @@ -1,12 +1,14 @@ -from app_config.settings import SingletonMeta + from typing import Dict from enum import Enum +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy_utils import database_exists, create_database -from sqlalchemy.sql import text +from app_config.settings import SingletonMeta class DBType(str, Enum): POSTGRES = "postgres" SQLITE = "sqlite" @@ -38,7 +40,7 @@ def __init__(self, *args, **kwargs): # self.Session = sessionmaker(bind=self.engine) def get_engine(self): - return self.engine + return self.engine.connect() # @@ -94,11 +96,19 @@ def get_db(config: Dict) -> "DbSettings": return factory(**config) -# Test -db_test = {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'dvdrental', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432} -test = DBContext.get_db(db_test) -sss = test.get_engine() -print(sss) +# # # Test +# db_test = {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'dvdrental', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432} +# test = DBContext.get_db(db_test) + +# sss = test.get_engine() +# print(sss) +# from sqlalchemy import text +# sql = text('SELECT * from customer WHERE customer_id=1') +# results = sss.execute(sql) +# for e in results: +# print(e) + + # import pandas as pd # sql = ''' # SELECT * FROM actor; diff --git a/app_config/db_config_with_ABC.py b/adi/app_config/db_config_with_ABC.py similarity index 97% rename from app_config/db_config_with_ABC.py rename to adi/app_config/db_config_with_ABC.py index 219fab7..43f31d9 100644 --- a/app_config/db_config_with_ABC.py +++ b/adi/app_config/db_config_with_ABC.py @@ -1,4 +1,3 @@ -from app_config.settings import SingletonMeta from abc import ABC, abstractmethod from typing import Dict from enum import Enum diff --git a/adi/app_config/settings.py b/adi/app_config/settings.py new file mode 100644 index 0000000..39df6d9 --- /dev/null +++ b/adi/app_config/settings.py @@ -0,0 +1,63 @@ +import yaml +from functools import reduce +import operator +from pathlib import Path + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + + + +config_file = Path('app_config', 'config.yaml') +rules = 'application_conig.rules.' + + +class SingletonMeta(type): + + _instances = {} + + def __call__(cls, *args, **kwargs ): + if cls not in cls._instances: + instance = super().__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class Settings(metaclass=SingletonMeta): + + def __init__(self, *args, **kwargs): + self.config_file = kwargs['config_file'] + + + with open(self.config_file, "r") as stream: + try: + self.settings = yaml.safe_load(stream) + + except yaml.YAMLError as exc: + print(exc) + + def get(self, element): + return reduce(operator.getitem, element.split('.'), self.settings) + + +settings = Settings(config_file=config_file) + + + +## adding sys.path.append(str(Path(__file__).parent.parent)) - will include the parent dir so can work directly +# or from main + +# s1 = Settings(config_file='config.yaml') +# print(s1.get('databases.mongo.ENGINE')) + +# if __name__ == "__main__": +# # The client code. +# config_file = Path('.', 'config.yaml') +# s1 = Settings(config_file=config_file) + + # print(s1.get('databases.mongo.ENGINE')) +# if id(s1) == id(s2): +# print("Singleton works, both variables contain the same instance.") +# else: +# print("Singleton failed, variables contain different instances.") \ No newline at end of file diff --git a/app_config/task.py b/adi/app_config/task.py similarity index 100% rename from app_config/task.py rename to adi/app_config/task.py diff --git a/app_config/tasks_orchestrator.py b/adi/app_config/tasks_orchestrator.py similarity index 100% rename from app_config/tasks_orchestrator.py rename to adi/app_config/tasks_orchestrator.py diff --git a/adi/app_config/tes/p.py b/adi/app_config/tes/p.py new file mode 100644 index 0000000..4bc54dc --- /dev/null +++ b/adi/app_config/tes/p.py @@ -0,0 +1,94 @@ +# import the required modules + +from abc import ABCMeta, abstractmethod +import copy + + +# class - Courses at GeeksforGeeks +class Courses_At_GFG(metaclass = ABCMeta): + + # constructor + def __init__(self): + self.id = None + self.type = None + + @abstractmethod + def course(self): + pass + + def get_type(self): + return self.type + + def get_id(self): + return self.id + + def set_id(self, sid): + self.id = sid + + def clone(self): + return copy.copy(self) + +# class - DSA course +class DSA(Courses_At_GFG): + def __init__(self): + super().__init__() + self.type = "Data Structures and Algorithms" + + def course(self): + print("Inside DSA::course() method") + +# class - SDE Course +class SDE(Courses_At_GFG): + def __init__(self): + super().__init__() + self.type = "Software Development Engineer" + + def course(self): + print("Inside SDE::course() method.") + +# class - STL Course +class STL(Courses_At_GFG): + def __init__(self): + super().__init__() + self.type = "Standard Template Library" + + def course(self): + print("Inside STL::course() method.") + +# class - Courses At GeeksforGeeks Cache +class Courses_At_GFG_Cache: + + # cache to store useful information + cache = {} + + @staticmethod + def get_course(sid): + COURSE = Courses_At_GFG_Cache.cache.get(sid, None) + return COURSE.clone() + + @staticmethod + def load(): + sde = SDE() + sde.set_id("1") + Courses_At_GFG_Cache.cache[sde.get_id()] = sde + + dsa = DSA() + dsa.set_id("2") + Courses_At_GFG_Cache.cache[dsa.get_id()] = dsa + + stl = STL() + stl.set_id("3") + Courses_At_GFG_Cache.cache[stl.get_id()] = stl + +# main function +if __name__ == '__main__': + Courses_At_GFG_Cache.load() + + sde = Courses_At_GFG_Cache.get_course("1") + print(sde.get_type()) + + dsa = Courses_At_GFG_Cache.get_course("2") + print(dsa.get_type()) + + stl = Courses_At_GFG_Cache.get_course("3") + print(stl.get_type()) diff --git a/adi/call_func.py b/adi/call_func.py new file mode 100644 index 0000000..477c6e9 --- /dev/null +++ b/adi/call_func.py @@ -0,0 +1,23 @@ +from app_config.settings import Settings +# from app_config.db_config import DBContext +from pathlib import Path +# from loadCsv.tasks import CustomerTable,AddTask,load_csv +# from loadCsv.client import LoadCsv +# from loadCsv.tasks_2 import test_load +config_file = Path('app_config', 'config.yaml') +from loadCsv.tasks import is_celery_working,get_celery_worker_status + +rules = 'application_conig.rules.' + + +def main(name): + print(get_celery_worker_status()) + # Use a breakpoint in the code line below to debug your script. + # settings = Settings(config_file=config_file) + # a=is_celery_working.delay() + # print(a.get()) + +if __name__ == '__main__': + + main('ADI') + diff --git a/adi/celery_run b/adi/celery_run new file mode 100644 index 0000000..5941ec9 --- /dev/null +++ b/adi/celery_run @@ -0,0 +1,6 @@ +watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker --hostname=worker.main@%h --pool=gevent --concurrency=10 --queues=main -l INFO + +watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker --hostname=worker.db@%h --pool=gevent --concurrency=10 --queues=db -l INFO + + +watchmedo auto-restart --directory=./celery_app --pattern=*.py --ignore-patterns="*config*" --recursive -- celery -A celery_app.worker worker --hostname=worker.db@%h --pool=gevent --concurrency=10 --queues=db -l INFO \ No newline at end of file diff --git a/adi/loadCsv/__init__.pu b/adi/loadCsv/__init__.pu new file mode 100644 index 0000000..e69de29 diff --git a/adi/loadCsv/celeryconfig.py b/adi/loadCsv/celeryconfig.py new file mode 100644 index 0000000..ea5eec8 --- /dev/null +++ b/adi/loadCsv/celeryconfig.py @@ -0,0 +1,12 @@ +enable_utc = True +timezone = 'Asia/Jerusalem' +broker='amqp://guest:guest@localhost:5672' +backend='db+postgresql://admin:admin@192.168.1.113:5432/celery' +imports=['loadCsv.load_manager' ,'loadCsv.tasks_2' ] +broker_pool_limit=0 +task_routes = { + 'test_db': {'queue': 'db'}, + 'load_from_db': {'queue': 'db'}, + 'route_load_type': {'queue': 'main'}, + 'LoadManager': {'queue': 'main'}, + } \ No newline at end of file diff --git a/adi/loadCsv/client.py b/adi/loadCsv/client.py new file mode 100644 index 0000000..166e4f4 --- /dev/null +++ b/adi/loadCsv/client.py @@ -0,0 +1,131 @@ + +import csv +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +import json + + +from celery import group + + +from app_config.db_config import DBContext +from app_config.settings import settings + +# from loadCsv.load_manager import LoadManager + +rules = 'application_conig.rules.' + +class LoadConfig: + + def __init__(self , settings, Thread=2 ) -> None: + + self.settings = settings + self.customers_list = self.settings.get(f'{rules}customers_list') + self.files = self.settings.get(f'{rules}files')[0] + self.files_path = self.settings.get(f'{rules}folder') + self.mapping_rule_file = 'loadCsv' + self.files_path + '/' + self.files + self.load_config:dict = {} + self.operation = None + + self.csv2dict = {} + self.db_connections = [] + self.load_manager = None + + + def __repr__(self): + return json.dumps(self.load_config) + + def run(self): + + # print("Run",json.dumps((self.load_config))) + + # return group([ avi.delay(customer) for customer in self.customers_list]) + #LoadManager().delay(config=(self.load_config)) + return + + + def initialize_operation(self): + self.csv2dict = self._convertcsv2dict(self.mapping_rule_file) + + self.load_config = { 'csvdict' :self.csv2dict, + 'customers_list': self.customers_list } + + + db_connection = {} + + for rule in self.csv2dict: + + if rule is not None: + if rule['rules']['source_type'] == 'db': + # Updating all required db connection + db_name = rule['rules']['source_name'] + db_connection[db_name] = { 'connection_details' : self.settings.get('databases.' + db_name),'engine' : ''} + + + self.db_connections = db_connection + + self.load_config['db_connections'] = db_connection + + + def prepare_celery_config(self): + db_config = self.load_config['db_connections'] + + for db_name , db_details in db_config.items(): + print(db_name ,db_details) + + + def get_db_connections(self): + return self.db_connections + + + @staticmethod + def _convertcsv2dict(file_path): + + content = [] + with open(file_path) as csvfile: + csv_reader = csv.reader(csvfile) + headers = next(csv_reader) + for row in csv_reader: + row_data = {key: value for key, value in zip(headers, row)} + updated_row = {} + updated_row.update({'key':'' , 'rules':row_data}) + content.append(updated_row) + sorted_mapping_rules = sorted(content, key=lambda d: d['rules']['order']) + + return sorted_mapping_rules + + def load(self): + pass + + #return group([ avi.delay(customer) for customer in self.customers_list]) + + # res = load_csv.delay(files) + # print(res.get()) + +load_config = LoadConfig(settings=settings) + +# load_config.initialize_operation() + +# print(load_config.csv2dict) + +# db_all = {} +# for db_name,db_details in load_config.db_connections.items(): +# print("Here --> \n", db_name ,db_details['connection_details']) +# db_engine = DBContext().get_db(db_details['connection_details']) +# db_all[db_name] = db_engine +# # print("DB connections is",db_engine) +# # load_config.initialize_operation() + + +# if __name__ == "__main__": + + +# settings = {'csvdict': [{'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM customer1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM rental1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM customer2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM rental2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}], 'customers_list': [1, 2, 3, 4, 5, 6, 7], 'db_connections': {'postgres': {'connection_name': 'source_ps', 'engine': }}} +# # config = settings +# # path = '/csv_files' +# # files = ['source1.csv'] +# x1 = LoadConfig(settings=settings) +# x1.initialize_operation() +# # # x1.set_db_connection() +# # print("here",x1.db_connection) \ No newline at end of file diff --git a/adi/loadCsv/csv_files/source1.csv b/adi/loadCsv/csv_files/source1.csv new file mode 100644 index 0000000..ca5674b --- /dev/null +++ b/adi/loadCsv/csv_files/source1.csv @@ -0,0 +1,7 @@ +source_type,source_name,db_connection_name,sql,target_type,db_connection_target,order +db,postgres,source_ps,SELECT * FROM customer1 where customer=&1 ,df , file ,1 +db,postgres,source_ps,SELECT * FROM rental1 where customer=&1 ,df , file ,1 +db,postgres,source_ps,SELECT * FROM customer2 where customer=&1 ,df , file ,3 +db,postgres,source_ps,SELECT * FROM rental2 where customer=&1 ,df , file ,3 +db,target,source_ps,SELECT * FROM customer2 where customer=&1 ,df , file ,3 +db,target,source_ps,SELECT * FROM rental2 where customer=&1 ,df , file ,3 \ No newline at end of file diff --git a/adi/loadCsv/exceptions.py b/adi/loadCsv/exceptions.py new file mode 100644 index 0000000..b587d42 --- /dev/null +++ b/adi/loadCsv/exceptions.py @@ -0,0 +1,18 @@ +class WorkflowException(Exception): + pass + + +class WrongDataSourceProvided(WorkflowException): + pass + + +class FileNotExists(WrongDataSourceProvided): + pass + + +class WorkflowConfigurationError(WorkflowException): + pass + + +class UnknownOperator(WorkflowException): + pass \ No newline at end of file diff --git a/adi/loadCsv/load_manager copy.py b/adi/loadCsv/load_manager copy.py new file mode 100644 index 0000000..db69f02 --- /dev/null +++ b/adi/loadCsv/load_manager copy.py @@ -0,0 +1,51 @@ + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +import json +from app_config.db_config import DBContext + +import time + + +import logging +logger = logging.getLogger(__name__) + + + +class WokrerConfig(): + name = "WokrerConfig" + ignore_result = False + _db_connections = {} + + def __init__ (self, *args, **kwargs): + """Main Task which load setting as db conenction and also he csv with rules + It first scan all required details as what db_connection involved + """ + self.config = json.loads(kwargs.get('config')) + for db in self.config['db_connections']: + """DBcontext can return either Engine or Session if needed and store all + in _db_connection """ + db_engine = DBContext().get_db(self.config['db_connections'][db]['connection_details']) + WokrerConfig._db_connections[db] = db_engine + self.mapping_rules = self.config['csvdict'] + self.customers_list = self.config['customers_list'] + kwargs = { 'mapping_rules': self.mapping_rules } + + + + + +if __name__ == "__main__": + import json + settings = json.dumps({"csvdict": [{"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}], "customers_list": [1, 2, 3, 4, 5, 6, 7], "db_connections": {"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}}}) + csv_dict= json.dumps({"csvdict": [{"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}], "customers_list": [1, 2, 3, 4, 5, 6, 7], "db_connections": {"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}}}) + a = WokrerConfig(config=settings) + print(a._db_connections) + + + +# #test_load.delay(keys=555,x=1,a=1,b=2,c=3) +# a = LoadManager().delay(keys=[1,2,3,4]) +# print(a.get()) + diff --git a/adi/loadCsv/load_manager.py b/adi/loadCsv/load_manager.py new file mode 100644 index 0000000..d2d61c5 --- /dev/null +++ b/adi/loadCsv/load_manager.py @@ -0,0 +1,91 @@ + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + + + +import json +from app_config.db_config import DBContext +from loadCsv.client import load_config +from celery import group + +from loadCsv.worker import app +from loadCsv.tasks_2 import proccess_customer +import time + + +import logging +logger = logging.getLogger(__name__) + + + +class LoadManager(app.Task): + name = "LoadManager" + ignore_result = False + _db_connections = {} + + def __call__(self, *args, **kwargs): + """Main Task which load setting as db conenction and also he csv with rules + It first scan all required details as what db_connection involved + """ + self.config = json.loads(kwargs.get('config')) + for db in self.config['db_connections']: + """DBcontext can return either Engine or Session if needed and store all + in _db_connection """ + # db_engine = DBContext().get_db(self.config['db_connections'][db]['connection_details']) + # LoadManager._db_connections[db] = db_engine + self.mapping_rules = self.config['csvdict'] + self.customers_list = self.config['customers_list'] + kwargs = { 'mapping_rules': self.mapping_rules } + return self.run(*args, **kwargs) + + def run(self,*args, **kwargs): + + print("RUN!") + proccess_customer.delay(1,**kwargs) + # a = load_data.delay(1,2,3,**kwargs) + # logger.info(f'kwargs Run = {kwargs}') + # logger.info(f'args Run = {args}') + + # return group([ proccess_customer.delay(customer, **kwargs) for customer in self.customers_list]) + + + + # logger.error('No x or y in arguments') + + def after_return(self, status, retval, task_id, args, kwargs, einfo): + print("Post Run") + #exit point of the task whatever is the state + # print(__name__ + " AFTER!!!!!!!!!!!!!!!") + # test_load.delay(**kwargs) + +# class AddTask(LoadManager): + +# def run(self,*args, **kwargs): +# logger.info(f'AddTask = {kwargs}') +# # logger.error('No x or y in arguments') + + +app.register_task(LoadManager()) + + +if __name__ == "__main__": + import json + load_config.initialize_operation() + # mapping_rules = json.dumps(load_config.csv2dict) + + + a = proccess_customer.delay(customer_id=1, a="Task!!!@#") + + # settings = json.dumps({"csvdict": [{"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}], "customers_list": [1, 2, 3, 4, 5, 6, 7], "db_connections": {"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}}}) + # csv_dict= json.dumps({"csvdict": [{"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}], "customers_list": [1, 2, 3, 4, 5, 6, 7], "db_connections": {"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}}}) + # a = LoadManager().delay(config=mapping_rules) + # print(a) + + + +# #test_load.delay(keys=555,x=1,a=1,b=2,c=3) +# a = LoadManager().delay(keys=[1,2,3,4]) +# print(a.get()) + diff --git a/adi/loadCsv/loader_example.py b/adi/loadCsv/loader_example.py new file mode 100644 index 0000000..4c6cdf9 --- /dev/null +++ b/adi/loadCsv/loader_example.py @@ -0,0 +1,97 @@ + +import time +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + +from app_config.db_config import DBContext +from celery import group + +from loadCsv.worker import app +from loadCsv.tasks_2 import route_load_type + + +import logging +logger = logging.getLogger(__name__) + + + +@app.task(bind=True , name='route_load_type') +def route_load_type(self,*args,**kwargs): + curr_customer = args[0] + mapping_rules = kwargs['mapping_rules'] + print("Proccessing customer:" , curr_customer ) + print("DB",self._db_connections) + """ Go thought each line and check if required fetch data from DB""" + for rule in mapping_rules: + if rule['rules']['source_type'] == 'db': + sql = rule['rules']['sql'] + # assign current customer to sql + sql = sql.replace("&1",str(curr_customer)) + + """ + Here will call other function to fetch the query: + 1.How to share DB connection + 2.this function called as async ,so the question should I continue call load_from_db with "delay" + + """ + + load_from_db() + elif rule['rules']['source_type'] == 'file': + pass + elif rule['rules']['source_type'] == 'other_type': + pass + + + +@app.task(bind=True , name='route_load_type') +def load_from_db(self,*args,**kwargs): + pass + + +class LoadManager(app.Task): + name = "LoadManager" + ignore_result = False + _db_connections = {} + + def __call__(self, *args, **kwargs): + """Main Task which load setting as db conenction and also he csv with rules + It first scan all required details as what db_connection involved + """ + self.config = json.loads(kwargs.get('config')) + """ Setting all required db connection""" + for db in self.config['db_connections']: + """DBcontext can return either Engine or Session if needed and store all + in _db_connection """ + db_engine = DBContext().get_db(self.config['db_connections'][db]['connection_details']) + LoadManager._db_connections[db] = db_engine + # saving csv file ( rules) + self.mapping_rules = self.config['csvdict'] + # all customers + self.customers_list = self.config['customers_list'] + kwargs = { 'mapping_rules': self.mapping_rules } + return self.run(*args, **kwargs) + + def run(self,*args, **kwargs): + # This would be the main/manager proccess which exeucte task for each customer + return group([ route_load_type.delay(customer, **kwargs) for customer in self.customers_list]) + + def after_return(self, status, retval, task_id, args, kwargs, einfo): + """Maybe here it would be right place to check if task completed successfully - if yes it required to check + all other tasks (probably with wait till complete mechansize OR if any task failed) + """ + pass + + + +app.register_task(LoadManager()) + + +if __name__ == "__main__": + import json + settings = json.dumps({"csvdict": [{"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental1 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "1"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM customer2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}, {"key": "", "rules": {"source_type": "db", "source_name": "postgres", "db_connection_name": "source_ps", "sql": "SELECT * FROM rental2 where customer=&1 ", "target_type": "df ", "db_connection_target": " file ", "order": "3"}}], "customers_list": [1, 2, 3, 4, 5, 6, 7], "db_connections": {"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}}}) + a = LoadManager().delay(config=settings) + print(a) + + + diff --git a/adi/loadCsv/runcelery b/adi/loadCsv/runcelery new file mode 100644 index 0000000..651a1ee --- /dev/null +++ b/adi/loadCsv/runcelery @@ -0,0 +1,5 @@ +jbc@rmcomplexity.com + +watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker --hostname=worker.main@%h --pool=gevent --concurrency=10 --queues=main -l INFO + +watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker --hostname=worker.db@%h --pool=gevent --concurrency=10 --queues=db -l INFO \ No newline at end of file diff --git a/adi/loadCsv/tasks.py b/adi/loadCsv/tasks.py new file mode 100644 index 0000000..ff8b3e6 --- /dev/null +++ b/adi/loadCsv/tasks.py @@ -0,0 +1,161 @@ + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +from loadCsv.worker import app + +def is_celery_working(): + result = app.control.broadcast('ping', reply=True, limit=1) + return bool(result) # True if at least one result + + + +def get_celery_worker_status(): + i = app.control.inspect() + availability = i.ping() + stats = i.stats() + registered_tasks = i.registered() + active_tasks = i.active() + scheduled_tasks = i.scheduled() + result = { + 'availability': availability, + 'stats': stats, + 'registered_tasks': registered_tasks, + 'active_tasks': active_tasks, + 'scheduled_tasks': scheduled_tasks + } + return result + +# print(is_celery_working()) +# print(get_celery_worker_status()) + + + +# import sys +# from pathlib import Path +# sys.path.append(str(Path(__file__).parent.parent)) +# from loadCsv.worker import app +# import time +# from celery import group,chord,chain + +# from threading import Thread + +# import logging +# logger = logging.getLogger(__name__) + +# @app.task(bind=True, name='jpr') +# def jpr(self): +# print("Hellow from jpr") + +# @app.task(bind=True, name='load_csv') +# def load_csv(self, x): +# print("Hi", x) +# return "load_csv_" + str(x) + +# @app.task(bind=True, name='avi') +# def avi(self, customer): +# return load_customer(customer) + + +# @app.task(bind=True, name='load_customer') +# def load_customer(self, customer): +# if customer % 2 == 0: +# return evennum(customer) +# else: +# return oddnum(customer) + +# @app.task(bind=True , name='evennum') +# def evennum(self,num): +# # time.sleep(5) +# return "even_customer" + str(num) + +# @app.task(bind=True , name='oddnum') +# def oddnum(self,num): +# # time.sleep(5) +# t1 = Thread(target=jpr) +# t2 = Thread(target=jpr) +# t1.start() +# time.sleep(3) +# t2.start() +# t1.join() +# t2.join() +# return "odd_customer" + str(num) + +# @app.task(bind=True , name='read_input_csv') +# def proccess_customers(self,customers, mapping_rules): +# return group([load_data.delay(customer, mapping_rules) for customer in customers]) + +# @app.task(bind=True , name='load_data') +# def load_data(self,customer,mapping_rules: dict): +# for rule in mapping_rules: +# rule.update({'key':customer}) +# print(rule['rules']['db_connection_source']) +# time.sleep(10) + +# # return group([load_data.delay(customer, mapping_rule) for customer in customers]) + + +# # class LoadManager(app.Task): +# # name = "LoadManager" +# # ignore_result = False +# # def __call__(self, *args, **kwargs): +# # """In celery task this function call the run method, here you can +# # set some environment variable before the run of the task""" +# # # oddnum.delay(1).get() +# # self.loadconfigurator = None +# # self.keys = kwargs.get('keys',[1,2]) +# # self.mapping_rules = kwargs.get('mapping_rules',[{'key': '', 'rules': {'db_connection_source': 'source_ps', 'sql': 'SELECT * FROM customer1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'db_connection_source': 'source_ps', 'sql': 'SELECT * FROM rental1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'db_connection_source': 'source_ps', 'sql': 'SELECT * FROM customer2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}, {'key': '', 'rules': {'db_connection_source': 'source_ps', 'sql': 'SELECT * FROM rental2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}]) +# # proccess_customers.delay(self.keys ,self.mapping_rules) + +# # return self.run(*args, **kwargs) + +# # def after_return(self, status, retval, task_id, args, kwargs, einfo): +# # #exit point of the task whatever is the state +# # print("AFTEr") +# # pass + +# # class AddTask(LoadManager): + +# # def run(self,*args, **kwargs): +# # logger.info(f'AddTask = {kwargs}') +# # # logger.error('No x or y in arguments') + +# # app.register_task(AddTask()) + + + + + + + + +# # class CustomerTable(app.Task): +# # name = "CustomerTable" +# # ignore_result = False + + +# # def run(self, *args, **kwargs): +# # self.row_num = None +# # self.source_db_connection = None +# # self.source_table = None +# # self.query = None +# # return self.generate_file(kwargs['a']) + +# # def generate_file(self, data): +# # return data.capitalize() + +# # def collect_data(self): +# # data = "avi celery" +# # return data + +# # app.register_task(CustomerTable()) + + +# # #a = AddTask().delay("Hello") + +# # # class CustomerData(app.Task): + + + +# # # app.register_task(CustomerTable()) + diff --git a/adi/loadCsv/tasks_2.py b/adi/loadCsv/tasks_2.py new file mode 100644 index 0000000..ba2542a --- /dev/null +++ b/adi/loadCsv/tasks_2.py @@ -0,0 +1,141 @@ + +import sys +import logging +import time +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +from celery import group,Task +from loadCsv.worker import app + +from app_config.db_config import DBContext +from loadCsv.client import load_config + +logger = logging.getLogger(__name__) + + +db_connections = {'postgres': {'connection_details': {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'dvdrental', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432}, 'engine': ''}, 'target': {'connection_details': {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'target', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432}, 'engine': ''}} + + + +class DatabaseTask(Task): + _db = {} + _all_customers = [] + _all_state = [] + _all_rules = [] + + @property + def all_rules(self): + if self._all_rules == []: + print("Initiate rules list - ONCE") + self._all_rules = load_config.csv2dict + return self._all_rules + + + + @property + def all_customers(self): + if self._all_customers == []: + print("Initiate Customer list - ONCE") + self._all_customers = load_config.customers_list + return self._all_customers + + + @property + def db(self): + if self._db == {}: + print("Initiate Db connection - ONCE") + for db_name,db_details in db_connections.items(): + # print("Init with",db_name, db_details['connection_details']) + db_engine = DBContext().get_db(db_details['connection_details']) + if db_engine: + self._db[db_name] = db_engine.get_engine() + + return self._db + + +@app.task(bind=True ,base=DatabaseTask, name='test_db') +def proccess_customer(self, *args, **kwargs): + import loadCsv.utils as utl + from sqlalchemy.sql import text + from sqlalchemy.exc import OperationalError,ProgrammingError + import pandas as pd + + customer_id = kwargs.get('customer_id') + + return "Test" + + + table_name = kwargs.get('table_name') + conn = kwargs.get('conn_target') + utl.init_customer() + + + conn_source = self.db['postgres'] + conn_target = self.db['target'] + + sql = text('SELECT * from customer') + query = conn_source.execute(sql) + + + df = pd.DataFrame(query.fetchall()) + utl.df_to_table(base=self, df=df, table_name='aaaa', conn_target=conn_target ,params="replace") + print("All good",self._all_state) + # try: + + # res = df.to_sql('target_test', conn_source, if_exists= 'replace') + # print("Trying" , res , self.__name__) + # conn_source.commit() + + # except (sqlaclchemy.exc.ProgrammingError, sqlalchemy.exc.OperationalError) as e: + # logger.Info('Error occured while executing a query {}'.format(e.args)) + + return "Ok" + + + + + + + + +# @app.task(bind=True , name='read_input_csv') +# def proccess_customers(self,customers, mapping_rules): +# return group([load_data.delay(customer, mapping_rules) for customer in customers]) + + + + + + + + + + + +# @app.task(bind=True , name='route_load_type') +# def route_load_type(self,*args,**kwargs): +# curr_customer = args[0] +# mapping_rules = kwargs['mapping_rules'] +# # print("mapping_rules", mapping_rules) +# print("Proccess customer:" , curr_customer ) +# for rule in mapping_rules: + +# if rule['rules']['source_type'] == 'db': +# sql = rule['rules']['sql'] +# sql = sql.replace("&1",str(curr_customer)) +# time.sleep(4) +# print("DB proccess for ", sql) +# load_from_db.delay(k="Sending ->" + str(curr_customer)) +# # rule.update({'key':curr_customer}) +# # print(rule['rules']['db_connection_source']) +# time.sleep(3) + + + +# @app.task(bind=True , name='load_from_db') +# def load_from_db(self,*args,**kwargs): +# time.sleep(4) +# print(kwargs['k']) +# return "Last" +# # return group([load_data.delay(customer, mapping_rule) for customer in customers]) + diff --git a/adi/loadCsv/utils/__init__.py b/adi/loadCsv/utils/__init__.py new file mode 100644 index 0000000..f473786 --- /dev/null +++ b/adi/loadCsv/utils/__init__.py @@ -0,0 +1,29 @@ +import sys +import logging +import time +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + +from loadCsv.exceptions import UnknownOperator +from loadCsv.utils.df_func import df_to_table + +operators = { + "df_to_table": df_to_table, + +} + + +def init_customer(*args, **kwargs): + base_init = kwargs.get('base') + + base_init.all_customers + base_init.all_rules + print("init customer completed") + return + + + + +def none_operator(*args, **kwargs): + # this should probably be handled in tasks init to fail quick not at runtime + raise UnknownOperator("Unknown operator passed!") \ No newline at end of file diff --git a/adi/loadCsv/utils/df_func.py b/adi/loadCsv/utils/df_func.py new file mode 100644 index 0000000..0159827 --- /dev/null +++ b/adi/loadCsv/utils/df_func.py @@ -0,0 +1,17 @@ + + +def df_to_table(*args, **kwargs): + base = kwargs.get('base') + df = kwargs.get('df') + table_name = kwargs.get('table_name') + conn = kwargs.get('conn_target') + params = kwargs.get('params' ,None) + try: + res = df.to_sql(table_name, conn, if_exists= 'replace') + print("here" , res) + conn.commit() + except (sqlaclchemy.exc.ProgrammingError, sqlalchemy.exc.OperationalError) as e: + print('Error occured while executing a query {}'.format(e.args)) + + base._all_state.append("Ok") + return 0 diff --git a/adi/loadCsv/worker.py b/adi/loadCsv/worker.py new file mode 100644 index 0000000..602e574 --- /dev/null +++ b/adi/loadCsv/worker.py @@ -0,0 +1,31 @@ +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +from celery import Celery +import loadCsv.celeryconfig as celeryconfig + +app = Celery('proj') + +app.config_from_object(celeryconfig) + +# Optional configuration, see the application user guide. +app.conf.update( + result_expires=3600, +) + + + +try: + app.broker_connection().ensure_connection(max_retries=3) +except Exception as ex: + raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex))) + +##avi@desktop-hili:~/Dev/adi/ADI/adi$ watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker -l INFO + +# app.autodiscover_tasks([ +# 'loadCsv' +# ] ,force=True) + + +# if __name__ == '__main__': +# app.start() \ No newline at end of file diff --git a/adi/loadCsv/worker_db.py b/adi/loadCsv/worker_db.py new file mode 100644 index 0000000..f1778ec --- /dev/null +++ b/adi/loadCsv/worker_db.py @@ -0,0 +1,32 @@ +from celery import Celery + +app = Celery('proj', + broker='amqp://guest:guest@localhost:5672', + backend='db+postgresql://admin:admin@192.168.1.113:5432/celery + # , + # include=['loadCsv.tasks','loadCsv.load_manager' ,'loadCsv.tasks_2 + ], + broker_pool_limit=0) + +# Optional configuration, see the application user guide. +app.conf.update( + result_expires=3600, +) + +task_routes = {'loadCsv.tasks_2.load_from_db': {'queue': 'db'}} +app.conf.task_routes = task_routes + +try: + app.broker_connection().ensure_connection(max_retries=3) +except Exception as ex: + raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex))) + +##avi@desktop-hili:~/Dev/adi/ADI/adi$ watchmedo auto-restart --directory=./loadCsv --pattern=*.py --recursive -- celery -A loadCsv.worker worker -l INFO + +# app.autodiscover_tasks([ +# 'loadCsv' +# ] ,force=True) + + +# if __name__ == '__main__': +# app.start() \ No newline at end of file diff --git a/adi/loader/.dockerignore b/adi/loader/.dockerignore new file mode 100644 index 0000000..e69de29 diff --git a/adi/loader/Dockerfile b/adi/loader/Dockerfile new file mode 100644 index 0000000..d113d6f --- /dev/null +++ b/adi/loader/Dockerfile @@ -0,0 +1,11 @@ +# Dockerfile +FROM python:3.6.6 +ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 PYTHONUNBUFFERED=1 + +WORKDIR / +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt +RUN rm requirements.txt + +COPY . / +WORKDIR /app \ No newline at end of file diff --git a/adi/loader/app/__init__.py b/adi/loader/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adi/loader/app/tasks.py b/adi/loader/app/tasks.py new file mode 100644 index 0000000..8b5d929 --- /dev/null +++ b/adi/loader/app/tasks.py @@ -0,0 +1,39 @@ +from .worket import app + +# tasks.py +@app.task(bind=True, name='refresh') +def refresh(self, urls): + for url in urls: + fetch_source.s(url).delay() + +@app.task(bind=True, name='fetch_source') +def fetch_source(self, url): + source = newspaper.build(url) + for article in source.articles: + fetch_article.s(article.url).delay() + +# tasks.py +@app.task(bind=True, name='save_article', queue='minio') +def save_article(self, bucket, key, text): + minio_client = Minio('localhost:9000', + access_key='AKIAIOSFODNN7EXAMPLE', + secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', + secure=False) + try: + minio_client.make_bucket(bucket, location="us-east-1") + except BucketAlreadyExists: + pass + except BucketAlreadyOwnedByYou: + pass + + hexdigest = hashlib.md5(text.encode()).hexdigest() + + try: + st = minio_client.stat_object(bucket, key) + update = st.etag != hexdigest + except NoSuchKey as err: + update = True + + if update: + stream = BytesIO(text.encode()) + minio_client.put_object(bucket, key, stream, stream.getbuffer().nbytes) diff --git a/adi/loader/app/worket.py b/adi/loader/app/worket.py new file mode 100644 index 0000000..d78d4b9 --- /dev/null +++ b/adi/loader/app/worket.py @@ -0,0 +1,22 @@ +# worker.py +from celery import Celery + + + +app = Celery( + broker='amqp://user:password@localhost:5672', + include=['tasks']) + +app.conf.beat_schedule = { + 'refresh': { + 'task': 'refresh', + 'schedule': 300.0, + 'args': ([ + 'https://www.theguardian.com', + 'https://www.nytimes.com' + ],), +} +} + +if __name__ == '__main__': + app.start() \ No newline at end of file diff --git a/adi/loader/requirements.txt b/adi/loader/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/adi/main.py similarity index 62% rename from main.py rename to adi/main.py index e132d2c..c72dbce 100644 --- a/main.py +++ b/adi/main.py @@ -1,19 +1,33 @@ from app_config.settings import Settings from app_config.db_config import DBContext -from app_config.task import Task -import os from pathlib import Path -config_file = Path('app_config','config.yaml') +from loadCsv.client import LoadConfig +from loadCsv.tasks_2 import test_load +config_file = Path('app_config', 'config.yaml') +# settings = Settings(config_file=config_file) rules = 'application_conig.rules.' - def main(name): # Use a breakpoint in the code line below to debug your script. settings = Settings(config_file=config_file) + customers_list = settings.get(f'{rules}customers_list') - print(customers_list) + files = settings.get(f'{rules}files') + folder_path = settings.get(f'{rules}folder') + source_db = DBContext().get_db(settings.get('databases.postgres')) + + a = LoadConfig(settings=settings) + + a.initialize_operation() + print(a) + a.run() + #res = AddTask().delay(1,2) + # print(res.get()) + # LoadCsv(setting=settings) + # a = CustomerTable().delay(a="aaaaaaaaa") + # print(a.get()) exit() source_db = DBContext().get_db(settings.get('databases.postgres')) rules_folder= settings.get(f'{rules}folder') @@ -33,6 +47,6 @@ def main(name): # Press the green button in the gutter to run the script. if __name__ == '__main__': - main('PyCharm') + main('ADI') # See PyCharm help at https://www.jetbrains.com/help/pycharm/ diff --git a/adi/oob_celery/DevClient.py b/adi/oob_celery/DevClient.py new file mode 100644 index 0000000..cc425b1 --- /dev/null +++ b/adi/oob_celery/DevClient.py @@ -0,0 +1,31 @@ +from DevScriptAndTesting.drop_target_tables import clearn_target + + +# drop all target +clearn_target() + + + + + + + + + + + + + + + + + + + +# config = LoadConfig(settings=settings) +# config.initialize_operation() + + +# rules = config.load_config['csvdict'] +# db_connections = config.load_config['db_connections'] + diff --git a/adi/oob_celery/DevScriptAndTesting/drop_target_tables.py b/adi/oob_celery/DevScriptAndTesting/drop_target_tables.py new file mode 100644 index 0000000..e04208a --- /dev/null +++ b/adi/oob_celery/DevScriptAndTesting/drop_target_tables.py @@ -0,0 +1,34 @@ +from settings import Settings +from pathlib import Path +from sqlalchemy.sql import text +import sys +from pathlib import Path + +import pandas as pd + + +from loader_config import LoadConfig +from db_config.config import DBContext + + +config_file = Path('app_config', 'config.yaml') +settings = Settings(config_file=config_file) + + + +def clearn_target(setting=settings): + + sql = '''SELECT tablename FROM pg_catalog.pg_tables + WHERE schemaname='public' ;''' + + target_db = settings.get('databases.target') + engine = DBContext().get_db(target_db) + engine_connected = engine.get_engine() + temp_tables = pd.read_sql(sql, engine_connected)['tablename'] + print("Table to drop", temp_tables) + with engine_connected as con: + for table in temp_tables: + sql = text(f"DROP table {table} CASCADE") + con.execute(sql) + print(f"Dropped table {table}.") + diff --git a/adi/oob_celery/DevScriptAndTesting/sql_synth_data/customer_info.sql b/adi/oob_celery/DevScriptAndTesting/sql_synth_data/customer_info.sql new file mode 100644 index 0000000..d1d2f3e --- /dev/null +++ b/adi/oob_celery/DevScriptAndTesting/sql_synth_data/customer_info.sql @@ -0,0 +1,90 @@ +--- function for random int (so will use based number of customer in customer-in) + +CREATE OR REPLACE FUNCTION random_between(low INT ,high INT) + RETURNS INT AS +$$ +BEGIN + RETURN floor(random()* (high-low + 1) + low); +END; +$$ language 'plpgsql' STRICT; + + +CREATE TABLE customer_info( + info_id SERIAL PRIMARY KEY, + company_id INT, + contact_name VARCHAR(255) NOT NULL, + phone VARCHAR(25), + email VARCHAR(100), + customer_id int , + CONSTRAINT fk_customer_info + FOREIGN KEY(customer_id) + REFERENCES customer(customer_id) +); + + +delete from customer_info + + + + + +drop table customer_binary; +CREATE TABLE customer_binary( + data_id SERIAL PRIMARY KEY, + customer_bin_data BYTEA, + customer_id int , + CONSTRAINT fk_customer_data + FOREIGN KEY(customer_id) + REFERENCES customer(customer_id) +); + + + +create extension pgcrypto; + +INSERT INTO customer_binary(data_id, customer_bin_data, customer_id) +SELECT id, gen_random_bytes(16), cust FROM generate_series(1,100000) id ,random_between(1,599) cust; + + + + + +select * from customer_info + + +INSERT INTO customer_info(info_id, contact_name, phone, email,customer_id) +SELECT id, md5(random()::text), md5(random()::text)::varchar(20), md5(random()::text) ,cust +FROM generate_series(1,10000) id ,random_between(1,599) cust; + + +update customer_info + set customer_id = floor(random_between(1,599)); + + + + + + +drop table customer_binary; +CREATE TABLE customer_binary( + data_id SERIAL PRIMARY KEY, + customer_bin_data BYTEA, + customer_id int , + CONSTRAINT fk_customer_data + FOREIGN KEY(customer_id) + REFERENCES customer(customer_id) +); + + + +create extension pgcrypto; + +INSERT INTO customer_binary(data_id, customer_bin_data, customer_id) +SELECT id, gen_random_bytes(16), cust FROM generate_series(1,1000000) id ,random_between(1,599) cust; + + +update customer_binary + set customer_id = floor(random_between(1,599)); + +select distinct(customer_id ), count(customer_id) from customer_binary group by customer_id +having count(customer_id) >1 \ No newline at end of file diff --git a/adi/oob_celery/__init__.py b/adi/oob_celery/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app_config/config.yaml b/adi/oob_celery/app_config/config.yaml similarity index 81% rename from app_config/config.yaml rename to adi/oob_celery/app_config/config.yaml index af8858c..c3a4ee8 100644 --- a/app_config/config.yaml +++ b/adi/oob_celery/app_config/config.yaml @@ -5,8 +5,8 @@ application_conig: db_archive: "/db_archive" rules: - folder: "/rules" - files: ['source_1.csv' ] + folder: "mapping_rules" + files: ['source1.csv' ] customers_list: [1,2,3,4,5,6,7] @@ -44,6 +44,15 @@ databases: HOST: '192.168.1.113' PORT: 5432 + target: + DB_TYPE: 'postgres' + ENGINE: 'postgres' + NAME: 'target' + USER: 'admin' + PASSWORD: 'admin' + HOST: '192.168.1.113' + PORT: 5432 + redis: host: redis_db port: 6379 diff --git a/adi/oob_celery/app_monitor/_not_in_use.py b/adi/oob_celery/app_monitor/_not_in_use.py new file mode 100644 index 0000000..791c19d --- /dev/null +++ b/adi/oob_celery/app_monitor/_not_in_use.py @@ -0,0 +1,415 @@ +from typing import Optional + +import databases +import pydantic + +import ormar +import sqlalchemy + +DATABASE_URL = "sqlite:///db.sqlite" +database = databases.Database(DATABASE_URL) +metadata = sqlalchemy.MetaData() + + +# note that this step is optional -> all ormar cares is a internal +# class with name Meta and proper parameters, but this way you do not +# have to repeat the same parameters if you use only one database +class BaseMeta(ormar.ModelMeta): + metadata = metadata + database = database + + +# Note that all type hints are optional +# below is a perfectly valid model declaration +# class Author(ormar.Model): +# class Meta(BaseMeta): +# tablename = "authors" +# +# id = ormar.Integer(primary_key=True) # <= notice no field types +# name = ormar.String(max_length=100) + + +class Author(ormar.Model): + class Meta(BaseMeta): + tablename = "authors" + + id: int = ormar.Integer(primary_key=True) + name: str = ormar.String(max_length=100) + + +class Book(ormar.Model): + class Meta(BaseMeta): + tablename = "books" + + id: int = ormar.Integer(primary_key=True) + author: Optional[Author] = ormar.ForeignKey(Author) + title: str = ormar.String(max_length=100) + year: int = ormar.Integer(nullable=True) + + +# create the database +# note that in production you should use migrations +# note that this is not required if you connect to existing database +engine = sqlalchemy.create_engine(DATABASE_URL) +# just to be sure we clear the db before +metadata.drop_all(engine) +metadata.create_all(engine) + + +# all functions below are divided into functionality categories +# note how all functions are defined with async - hence can use await AND needs to +# be awaited on their own +async def create(): + # Create some records to work with through QuerySet.create method. + # Note that queryset is exposed on each Model's class as objects + tolkien = await Author.objects.create(name="J.R.R. Tolkien") + await Book.objects.create(author=tolkien, title="The Hobbit", year=1937) + await Book.objects.create(author=tolkien, title="The Lord of the Rings", year=1955) + await Book.objects.create(author=tolkien, title="The Silmarillion", year=1977) + + # alternative creation of object divided into 2 steps + sapkowski = Author(name="Andrzej Sapkowski") + # do some stuff + await sapkowski.save() + + # or save() after initialization + await Book(author=sapkowski, title="The Witcher", year=1990).save() + await Book(author=sapkowski, title="The Tower of Fools", year=2002).save() + + # to read more about inserting data into the database + # visit: https://collerek.github.io/ormar/queries/create/ + + +async def read(): + # Fetch an instance, without loading a foreign key relationship on it. + # Django style + book = await Book.objects.get(title="The Hobbit") + # or python style + book = await Book.objects.get(Book.title == "The Hobbit") + book2 = await Book.objects.first() + + # first() fetch the instance with lower primary key value + assert book == book2 + + # you can access all fields on loaded model + assert book.title == "The Hobbit" + assert book.year == 1937 + + # when no condition is passed to get() + # it behaves as last() based on primary key column + book3 = await Book.objects.get() + assert book3.title == "The Tower of Fools" + + # When you have a relation, ormar always defines a related model for you + # even when all you loaded is a foreign key value like in this example + assert isinstance(book.author, Author) + # primary key is populated from foreign key stored in books table + assert book.author.pk == 1 + # since the related model was not loaded all other fields are None + assert book.author.name is None + + # Load the relationship from the database when you already have the related model + # alternatively see joins section below + await book.author.load() + assert book.author.name == "J.R.R. Tolkien" + + # get all rows for given model + authors = await Author.objects.all() + assert len(authors) == 2 + + # to read more about reading data from the database + # visit: https://collerek.github.io/ormar/queries/read/ + + +async def update(): + # read existing row from db + tolkien = await Author.objects.get(name="J.R.R. Tolkien") + assert tolkien.name == "J.R.R. Tolkien" + tolkien_id = tolkien.id + + # change the selected property + tolkien.name = "John Ronald Reuel Tolkien" + # call update on a model instance + await tolkien.update() + + # confirm that object was updated + tolkien = await Author.objects.get(name="John Ronald Reuel Tolkien") + assert tolkien.name == "John Ronald Reuel Tolkien" + assert tolkien.id == tolkien_id + + # alternatively update data without loading + await Author.objects.filter(name__contains="Tolkien").update(name="J.R.R. Tolkien") + + # to read more about updating data in the database + # visit: https://collerek.github.io/ormar/queries/update/ + + +async def delete(): + silmarillion = await Book.objects.get(year=1977) + # call delete() on instance + await silmarillion.delete() + + # alternatively delete without loading + await Book.objects.delete(title="The Tower of Fools") + + # note that when there is no record ormar raises NoMatch exception + try: + await Book.objects.get(year=1977) + except ormar.NoMatch: + print("No book from 1977!") + + # to read more about deleting data from the database + # visit: https://collerek.github.io/ormar/queries/delete/ + + # note that despite the fact that record no longer exists in database + # the object above is still accessible and you can use it (and i.e. save()) again. + tolkien = silmarillion.author + await Book.objects.create(author=tolkien, title="The Silmarillion", year=1977) + + +async def joins(): + # Tho join two models use select_related + + # Django style + book = await Book.objects.select_related("author").get(title="The Hobbit") + # Python style + book = await Book.objects.select_related(Book.author).get( + Book.title == "The Hobbit" + ) + + # now the author is already prefetched + assert book.author.name == "J.R.R. Tolkien" + + # By default you also get a second side of the relation + # constructed as lowercase source model name +'s' (books in this case) + # you can also provide custom name with parameter related_name + + # Django style + author = await Author.objects.select_related("books").all(name="J.R.R. Tolkien") + # Python style + author = await Author.objects.select_related(Author.books).all( + Author.name == "J.R.R. Tolkien" + ) + assert len(author[0].books) == 3 + + # for reverse and many to many relations you can also prefetch_related + # that executes a separate query for each of related models + + # Django style + author = await Author.objects.prefetch_related("books").get(name="J.R.R. Tolkien") + # Python style + author = await Author.objects.prefetch_related(Author.books).get( + Author.name == "J.R.R. Tolkien" + ) + assert len(author.books) == 3 + + # to read more about relations + # visit: https://collerek.github.io/ormar/relations/ + + # to read more about joins and subqueries + # visit: https://collerek.github.io/ormar/queries/joins-and-subqueries/ + + +async def filter_and_sort(): + # to filter the query you can use filter() or pass key-value pars to + # get(), all() etc. + # to use special methods or access related model fields use double + # underscore like to filter by the name of the author use author__name + # Django style + books = await Book.objects.all(author__name="J.R.R. Tolkien") + # python style + books = await Book.objects.all(Book.author.name == "J.R.R. Tolkien") + assert len(books) == 3 + + # filter can accept special methods also separated with double underscore + # to issue sql query ` where authors.name like "%tolkien%"` that is not + # case sensitive (hence small t in Tolkien) + # Django style + books = await Book.objects.filter(author__name__icontains="tolkien").all() + # python style + books = await Book.objects.filter(Book.author.name.icontains("tolkien")).all() + assert len(books) == 3 + + # to sort use order_by() function of queryset + # to sort decreasing use hyphen before the field name + # same as with filter you can use double underscores to access related fields + # Django style + books = ( + await Book.objects.filter(author__name__icontains="tolkien") + .order_by("-year") + .all() + ) + # python style + books = ( + await Book.objects.filter(Book.author.name.icontains("tolkien")) + .order_by(Book.year.desc()) + .all() + ) + assert len(books) == 3 + assert books[0].title == "The Silmarillion" + assert books[2].title == "The Hobbit" + + # to read more about filtering and ordering + # visit: https://collerek.github.io/ormar/queries/filter-and-sort/ + + +async def subset_of_columns(): + # to exclude some columns from loading when querying the database + # you can use fileds() method + hobbit = await Book.objects.fields(["title"]).get(title="The Hobbit") + # note that fields not included in fields are empty (set to None) + assert hobbit.year is None + assert hobbit.author is None + + # selected field is there + assert hobbit.title == "The Hobbit" + + # alternatively you can provide columns you want to exclude + hobbit = await Book.objects.exclude_fields(["year"]).get(title="The Hobbit") + # year is still not set + assert hobbit.year is None + # but author is back + assert hobbit.author is not None + + # also you cannot exclude primary key column - it's always there + # even if you EXPLICITLY exclude it it will be there + + # note that each model have a shortcut for primary_key column which is pk + # and you can filter/access/set the values by this alias like below + assert hobbit.pk is not None + + # note that you cannot exclude fields that are not nullable + # (required) in model definition + try: + await Book.objects.exclude_fields(["title"]).get(title="The Hobbit") + except pydantic.ValidationError: + print("Cannot exclude non nullable field title") + + # to read more about selecting subset of columns + # visit: https://collerek.github.io/ormar/queries/select-columns/ + + +async def pagination(): + # to limit number of returned rows use limit() + books = await Book.objects.limit(1).all() + assert len(books) == 1 + assert books[0].title == "The Hobbit" + + # to offset number of returned rows use offset() + books = await Book.objects.limit(1).offset(1).all() + assert len(books) == 1 + assert books[0].title == "The Lord of the Rings" + + # alternatively use paginate that combines both + books = await Book.objects.paginate(page=2, page_size=2).all() + assert len(books) == 2 + # note that we removed one book of Sapkowski in delete() + # and recreated The Silmarillion - by default when no order_by is set + # ordering sorts by primary_key column + assert books[0].title == "The Witcher" + assert books[1].title == "The Silmarillion" + + # to read more about pagination and number of rows + # visit: https://collerek.github.io/ormar/queries/pagination-and-rows-number/ + + +async def aggregations(): + # count: + assert 2 == await Author.objects.count() + + # exists + assert await Book.objects.filter(title="The Hobbit").exists() + + # maximum + assert 1990 == await Book.objects.max(columns=["year"]) + + # minimum + assert 1937 == await Book.objects.min(columns=["year"]) + + # average + assert 1964.75 == await Book.objects.avg(columns=["year"]) + + # sum + assert 7859 == await Book.objects.sum(columns=["year"]) + + # to read more about aggregated functions + # visit: https://collerek.github.io/ormar/queries/aggregations/ + + +async def raw_data(): + # extract raw data in a form of dicts or tuples + # note that this skips the validation(!) as models are + # not created from parsed data + + # get list of objects as dicts + assert await Book.objects.values() == [ + {"id": 1, "author": 1, "title": "The Hobbit", "year": 1937}, + {"id": 2, "author": 1, "title": "The Lord of the Rings", "year": 1955}, + {"id": 4, "author": 2, "title": "The Witcher", "year": 1990}, + {"id": 5, "author": 1, "title": "The Silmarillion", "year": 1977}, + ] + + # get list of objects as tuples + assert await Book.objects.values_list() == [ + (1, 1, "The Hobbit", 1937), + (2, 1, "The Lord of the Rings", 1955), + (4, 2, "The Witcher", 1990), + (5, 1, "The Silmarillion", 1977), + ] + + # filter data - note how you always get a list + assert await Book.objects.filter(title="The Hobbit").values() == [ + {"id": 1, "author": 1, "title": "The Hobbit", "year": 1937} + ] + + # select only wanted fields + assert await Book.objects.filter(title="The Hobbit").values(["id", "title"]) == [ + {"id": 1, "title": "The Hobbit"} + ] + + # if you select only one column you could flatten it with values_list + assert await Book.objects.values_list("title", flatten=True) == [ + "The Hobbit", + "The Lord of the Rings", + "The Witcher", + "The Silmarillion", + ] + + # to read more about extracting raw values + # visit: https://collerek.github.io/ormar/queries/aggregations/ + + +async def with_connect(function): + # note that for any other backend than sqlite you actually need to + # connect to the database to perform db operations + async with database: + await function() + + # note that if you use framework like `fastapi` you shouldn't connect + # in your endpoints but have a global connection pool + # check https://collerek.github.io/ormar/fastapi/ and section with db connection + + +# gather and execute all functions +# note - normally import should be at the beginning of the file +import asyncio + +# note that normally you use gather() function to run several functions +# concurrently but we actually modify the data and we rely on the order of functions +for func in [ + create, + read, + update, + delete, + joins, + filter_and_sort, + subset_of_columns, + pagination, + aggregations, + raw_data, +]: + print(f"Executing: {func.__name__}") + asyncio.run(with_connect(func)) + +# drop the database tables +metadata.drop_all(engine) diff --git a/adi/oob_celery/app_monitor/app_db.py b/adi/oob_celery/app_monitor/app_db.py new file mode 100644 index 0000000..c7aee28 --- /dev/null +++ b/adi/oob_celery/app_monitor/app_db.py @@ -0,0 +1,21 @@ +import databases +import pydantic + +import ormar +import sqlalchemy + +DATABASE_URL = 'postgresql://admin:admin@192.168.1.113:5432/target' + +app_engine = sqlalchemy.create_engine(DATABASE_URL) + + +database = databases.Database(DATABASE_URL) +metadata = sqlalchemy.MetaData() + + +# note that this step is optional -> all ormar cares is a internal +# class with name Meta and proper parameters, but this way you do not +# have to repeat the same parameters if you use only one database +class BaseMeta(ormar.ModelMeta): + metadata = metadata + database = database diff --git a/adi/oob_celery/app_monitor/models.py b/adi/oob_celery/app_monitor/models.py new file mode 100644 index 0000000..8315fc3 --- /dev/null +++ b/adi/oob_celery/app_monitor/models.py @@ -0,0 +1,49 @@ +from typing import Optional,Union,Dict + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + +from app_monitor.app_db import BaseMeta + + +import ormar +import sqlalchemy +from sqlalchemy import DateTime +import datetime +import pydantic + + +class AdiAllRun(ormar.Model): + class Meta(BaseMeta): + tablename: str = "adi_all_run" + + run_id: int = ormar.Integer(primary_key=True) + completed: bool = ormar.Boolean(default=False) + name: str = ormar.String(max_length=100) + start_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + end_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + + +class AdiCustomer(ormar.Model): + class Meta(BaseMeta): + tablename: str = "adi_customer" + + adi_identifier: int = ormar.Integer(primary_key=True) + customer_id: int = ormar.Integer() + run_id: int = ormar.ForeignKey(AdiAllRun) + status: str = ormar.String(max_length=100) + completed: bool = ormar.Boolean(default=False) + start_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + end_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + +class AdiRule(ormar.Model): + class Meta(BaseMeta): + tablename: str = "adi_rule" + + adi_identifier: int = ormar.Integer(primary_key=True) + rule_id: int + start_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + end_run: datetime.datetime = ormar.DateTime(default=datetime.datetime.now) + status: str = ormar.String(max_length=100) + adi_customer: Optional[Union[AdiCustomer,Dict]] = ormar.ForeignKey(AdiCustomer) diff --git a/adi/oob_celery/app_monitor/sqlal.py b/adi/oob_celery/app_monitor/sqlal.py new file mode 100644 index 0000000..5a647bf --- /dev/null +++ b/adi/oob_celery/app_monitor/sqlal.py @@ -0,0 +1,18 @@ +from sqlalchemy import create_engine ,inspect +from sqlalchemy import Table +from sqlalchemy.orm import declarative_base + +engine = create_engine("postgresql+psycopg2://admin:admin@192.168.1.113:5432/target") + + +inspector = inspect(engine) +schemas = inspector.get_schema_names() + +for schema in schemas: + print("schema: %s" % schema) + for table_name in inspector.get_table_names(schema=schema): + for column in inspector.get_columns(table_name, schema=schema): + print("Column: %s" % column) + + + diff --git a/adi/oob_celery/celery_app/__init__.py b/adi/oob_celery/celery_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adi/oob_celery/celery_app/celery_param_base.py b/adi/oob_celery/celery_app/celery_param_base.py new file mode 100644 index 0000000..03ea137 --- /dev/null +++ b/adi/oob_celery/celery_app/celery_param_base.py @@ -0,0 +1,39 @@ + +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +from celery_app.worker import app +from celery.utils.log import get_task_logger +#import celery_app.celeryconfig as celeryconfig + + + +logger = get_task_logger(__name__) + +class CeleryParams(app.Task): + # name = "CeleryParams" + # ignore_result = False + # def __call__(self, *args, **kwargs): + # """Set local config file""" + # import json + # # print(__name__ + "CeleryParams") + # f = open('celery_app/config_load.py','w') + # f.write(json.dumps(kwargs.get('db_connections'))) + # f.close() + # logger.info('!!!!!!!!!!!!!!!!!!!!!!!Found addition') + + def run(self,*args, **kwargs): + import json + print(__name__ + "CeleryParams") + f = open('celery_app/config_load.py','w') + f.write(json.dumps(kwargs.get('db_connections'))) + f.close() + logger.info('Found addition') + + def after_return(self, status, retval, task_id, args, kwargs, einfo): + logger.info(__name__ + 'Init completed addition') + + + +app.register_task(CeleryParams()) + diff --git a/adi/oob_celery/celery_app/celeryconfig.py b/adi/oob_celery/celery_app/celeryconfig.py new file mode 100644 index 0000000..039ff66 --- /dev/null +++ b/adi/oob_celery/celery_app/celeryconfig.py @@ -0,0 +1,15 @@ +enable_utc = True +timezone = 'Asia/Jerusalem' +broker='amqp://guest:guest@localhost:5672' +result_backend='db+postgresql://admin:admin@192.168.1.113:5432/celery' +imports=[ 'celery_app.tasks_2' , 'celery_app.celery_param_base'] +broker_pool_limit=0 +task_routes = { + 'CeleryParams': {'queue': 'db'}, + 'proccess_rule': {'queue': 'db'}, + 'init_db_connections2': {'queue': 'db'}, + 'init_db_connections': {'queue': 'db'}, + 'load_from_db': {'queue': 'db'}, + 'route_load_type': {'queue': 'main'}, + 'LoadManager': {'queue': 'main'}, + } diff --git a/adi/oob_celery/celery_app/config_load.py b/adi/oob_celery/celery_app/config_load.py new file mode 100644 index 0000000..b9dc156 --- /dev/null +++ b/adi/oob_celery/celery_app/config_load.py @@ -0,0 +1 @@ +{"postgres": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "dvdrental", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}, "target": {"connection_details": {"DB_TYPE": "postgres", "ENGINE": "postgres", "NAME": "target", "USER": "admin", "PASSWORD": "admin", "HOST": "192.168.1.113", "PORT": 5432}, "engine": ""}} \ No newline at end of file diff --git a/adi/oob_celery/celery_app/tasks_2.py b/adi/oob_celery/celery_app/tasks_2.py new file mode 100644 index 0000000..817226a --- /dev/null +++ b/adi/oob_celery/celery_app/tasks_2.py @@ -0,0 +1,214 @@ + +import sys +import logging +import time +import json +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + +from celery import group,Task +from celery_app.worker import app +from celery.utils.log import get_task_logger +from db_config.config import DBContext + + + + +from celery_app.celery_param_base import CeleryParams + + +logger = get_task_logger(__name__) + + +with open('celery_app/config_load.py') as f: + db_connections = json.load(f) + + +# db_connections = {'postgres': {'connection_details': {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'dvdrental', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432}, 'engine': ''}, 'target': {'connection_details': {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'target', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432}, 'engine': ''}} + +class Test: + _avi='Avi' + + @property + def avi(self): + return self._avi.upper() + + +class DatabaseTask(Test,Task): + _db = {} + _all_customers = [] + _all_state = [] + _all_rules = [] + _init_ind = False + + @property + def is_init(self): + if self._init_ind is False: + self._init_ind = True + return self._init_ind + + + + @property + def all_rules(self): + if self._all_rules == []: + print("Initiate rules list - ONCE") + self._all_rules = "Test" + return self._all_rules + + + + @property + def all_customers(self): + if self._all_customers == []: + print("Initiate Customer list - ONCE") + self._all_customers = [1,2,3] + return self._all_customers + + @property + def db(self): + if self._db == {}: + print("Initiate Db connection - ONCE") + for db_name,db_details in db_connections.items(): + # print("Init with",db_name, db_details['connection_details']) + db_engine = DBContext().get_db(db_details['connection_details']) + if db_engine: + self._db[db_name] = db_engine.get_engine() + + return self._db + + + + +@app.task(bind=True, base=DatabaseTask, name='init_db_connections2') +def init_db_connections2(self, **kwargs): + print("sel" ,self.db['postgres']) + + return "ok" + + +@app.task(bind=True, base=DatabaseTask, name='init_db_connections') +def init_db_connections(self, **kwargs): + + import celery_app.utils as utl + utl.init_config(base=self,**kwargs) + logger.info('InitDB Completed ') + return "init completed" + + +@app.task(bind=True ,base=DatabaseTask, name='proccess_rule') +#rule_id=self.rule_id, source_type=self.source_type, + # db_connection_name=self.db_connection_name, target_type=self.target_type, + # order=self.order +def proccess_rule(self, *args, **kwargs): + from celery_app.utils import load_operation,operators,init_config + from sqlalchemy.sql import text + from sqlalchemy.exc import OperationalError,ProgrammingError + import pandas as pd + import time + import random + rule_id = kwargs.get('rule_id') + main_id = kwargs.get('main_id') + source_type = kwargs.get('source_type') + source_name = kwargs.get('source_name') + source_object_name = kwargs.get('source_object_name') + sql = kwargs.get('sql') + target_name = kwargs.get('target_name') + target_object_name = kwargs.get('target_object_name') + target_type = kwargs.get('target_type') + order = kwargs.get('order') + + + df_source = None + db_connection = None + print("type db!!!!!!!" , target_type) + if source_type == 'db': + db_connection = self.db[source_name] + df = load_operation.load_table_from_db(conn=db_connection, sql=sql) + print("source",db_connection) + if target_type.strip() == 'db': + print("Back from Load!!!!!!", df_source) + db_connection = self.db[target_name] + print("source",db_connection) + load_operation.df_to_table(conn=db_connection, table_name=target_object_name ,df=df ,if_exists='append') + + + #time.sleep(random.randint(0,7)) + return 1 + + + # customer_id = kwargs.get('customer_id') +# time.sleep(1) + # table_name = kwargs.get('table_name') + # conn = kwargs.get('conn_target') + + + + # conn_source = self.db['postgres'] + # conn_target = self.db['target'] + + # sql = text('SELECT * from customer') + # query = conn_source.execute(sql) + + + # df = pd.DataFrame(query.fetchall()) + # utl.df_to_table(base=self, df=df, table_name='aaaa', conn_target=conn_target ,params="replace") + # print("All good",self._all_state) + # # try: + + # # res = df.to_sql('target_test', conn_source, if_exists= 'replace') + # # print("Trying" , res , self.__name__) + # # conn_source.commit() + + # # except (sqlaclchemy.exc.ProgrammingError, sqlalchemy.exc.OperationalError) as e: + # # logger.Info('Error occured while executing a query {}'.format(e.args)) + + # return "Ok" + + + + + +# init_db_connections2.delay() + + + +# @app.task(bind=True , name='read_input_csv') +# def proccess_rules(self,customers, mapping_rules): +# return group([load_data.delay(customer, mapping_rules) for customer in customers]) + + + + + + + + + +# @app.task(bind=True , name='route_load_type') +# def route_load_type(self,*args,**kwargs): +# curr_customer = args[0] +# mapping_rules = kwargs['mapping_rules'] +# # print("mapping_rules", mapping_rules) +# print("Proccess customer:" , curr_customer ) +# for rule in mapping_rules: + +# if rule['rules']['source_type'] == 'db': +# sql = rule['rules']['sql'] +# sql = sql.replace("&1",str(curr_customer)) +# time.sleep(4) +# print("DB proccess for ", sql) +# load_from_db.delay(k="Sending ->" + str(curr_customer)) +# # rule.update({'key':curr_customer}) +# # print(rule['rules']['db_connection_source']) +# time.sleep(3) + + + +# @app.task(bind=True , name='load_from_db') +# def load_from_db(self,*args,**kwargs): +# time.sleep(4) +# print(kwargs['k']) +# return "Last" +# # return group([load_data.delay(customer, mapping_rule) for customer in customers]) + diff --git a/adi/oob_celery/celery_app/utils/__init__.py b/adi/oob_celery/celery_app/utils/__init__.py new file mode 100644 index 0000000..b9aa42c --- /dev/null +++ b/adi/oob_celery/celery_app/utils/__init__.py @@ -0,0 +1,35 @@ +import sys +import logging +import time +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + +from celery_app.utils.df_func import * +from celery_app.utils.load_operation import * + +from celery.utils.log import get_task_logger +logger = get_task_logger(__name__) + +load_operators = { + "load_table": load_table, + "load_table_from_db": load_table_from_db, +} + +operators = { + "df_to_table": df_to_table, + +} + + +def init_config(*args, **kwargs): + base_init = kwargs.get('base') + db_to_init = kwargs.get('init_db') + try: + base_init.db[db_to_init] + logger.info('InitDB Completed ') + except (RuntimeError, TypeError, NameError) as e: + logger.error('InitDB Error ' , e) + finally: + logger.info('All good , init Completed ') + return + diff --git a/adi/oob_celery/celery_app/utils/df_func.py b/adi/oob_celery/celery_app/utils/df_func.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/adi/oob_celery/celery_app/utils/df_func.py @@ -0,0 +1 @@ + diff --git a/adi/oob_celery/celery_app/utils/load_operation.py b/adi/oob_celery/celery_app/utils/load_operation.py new file mode 100644 index 0000000..db6c4e6 --- /dev/null +++ b/adi/oob_celery/celery_app/utils/load_operation.py @@ -0,0 +1,58 @@ + + +def df_to_table(conn=None, df=None ,table_name=None ,if_exists='append'): + from sqlalchemy.sql import text + from sqlalchemy.exc import OperationalError, ProgrammingError + + import pandas as pd + import time + + # dict = {'Name' : ['Martha', 'Tim', 'Rob', 'Georgia'], + # 'Maths' : [87, 91, 97, 95], + # 'Science' : [83, 99, 84, 76]} + # df = pd.DataFrame(dict) + try: + number_of_row = df.to_sql(table_name, conn, if_exists= if_exists) + # print("!!!!!!!!!!result",res) + # conn.commit() + return number_of_row + except (ProgrammingError, OperationalError) as e: + print('Error occured while executing a query {}'.format(e.args)) + return False + # base._all_state.append("Ok") + + +def load_table_from_db(conn= None, sql=None): + from sqlalchemy.sql import text + import pandas as pd + import time + sql = text(sql) + query = conn.execute(sql) + df = pd.DataFrame(query.fetchall()) + return df + + +def load_table(*args ,**kwargs): + from sqlalchemy.sql import text + import pandas as pd + import time + + base = args[0] + + rule_id = kwargs.get('rule_id') + main_id = kwargs.get('main_id') + source_type = kwargs.get('source_type') + source_name = kwargs.get('source_name') + source_object_name = kwargs.get('source_object_name') + sql = kwargs.get('sql') + target_name = kwargs.get('target_name') + target_object_name = kwargs.get('target_object_name') + target_type = kwargs.get('target_type') + order = kwargs.get('order') + print(target_object_name) + return kwargs + # db_connection = base.db[source_name] + # sql = text(sql) + # query = db_connection.execute(sql) + # df = pd.DataFrame(query.fetchall()) + # return df diff --git a/adi/oob_celery/celery_app/worker.py b/adi/oob_celery/celery_app/worker.py new file mode 100644 index 0000000..f8de450 --- /dev/null +++ b/adi/oob_celery/celery_app/worker.py @@ -0,0 +1,41 @@ +import logging +import os +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) +from celery import Celery +from celery.signals import after_setup_logger +import celery_app.celeryconfig as celeryconfig + +app = Celery('adi') + +app.config_from_object(celeryconfig) + + +# Optional configuration, see the application user guide. +# app.conf.update( +# result_expires=3600, +# ) + + + +try: + app.broker_connection().ensure_connection(max_retries=3) +except Exception as ex: + raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex))) + + + +for f in ['celery_app/broker/out', 'celery_app/broker/processed']: + if not os.path.exists(f): + os.makedirs(f) + +@after_setup_logger.connect +def setup_loggers(logger, *args, **kwargs): + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # add filehandler + fh = logging.FileHandler('celery_app/celery.log') + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logger.addHandler(fh) \ No newline at end of file diff --git a/adi/oob_celery/celery_worket.sh b/adi/oob_celery/celery_worket.sh new file mode 100755 index 0000000..78177ae --- /dev/null +++ b/adi/oob_celery/celery_worket.sh @@ -0,0 +1 @@ +watchmedo auto-restart --directory=./celery_app --pattern=*.py --ignore-patterns="*config*" --recursive -- celery -A celery_app.worker worker --hostname=worker.db@%h --pool=gevent --concurrency=10 --queues=db -l INFO \ No newline at end of file diff --git a/adi/oob_celery/client.py b/adi/oob_celery/client.py new file mode 100644 index 0000000..4894c50 --- /dev/null +++ b/adi/oob_celery/client.py @@ -0,0 +1,119 @@ +from settings import Settings +from pathlib import Path +import json +import sys +from pathlib import Path + +from app_monitor.app_db import metadata,database,app_engine +from app_monitor.models import AdiAllRun,AdiCustomer,AdiRule + +from loader_config import LoadConfig +from customer import Customer +from celery_app.celery_param_base import CeleryParams + +import sqlalchemy + +import asyncio + +config_file = Path('app_config', 'config.yaml') + + +""" setting class holds all required parameters deails as db details , customer list , files path, +it based on dot notation and has get method support get nested level as get('param1.param2.param3) """ + +settings = Settings(config_file=config_file) + +""" just a direct path to rules in yaml""" +rules = 'application_conig.rules.' + + +""" LoadConfig load mapping rules (source1.csv) , prepare config (mapping rules based the csv) also db connection +(check all rules and prepare list of required db's) , customer list and etc """ + +config = LoadConfig(settings=settings) +config.initialize_operation() + + +rules = config.load_config['csvdict'] + + +#required db connections +db_connections = config.load_config['db_connections'] + + +# app db + + +# just to be sure we clear the db before +metadata.drop_all(bind=app_engine) +# metadata.create_all(app_engine) + +print("here") +exit() + +# test = {} + +# for db_name,db_details in db_connections.items(): +# # print("Init with",db_name, db_details['connection_details']) +# test[db_name] = db_details + +# print("here " , test) + +# exit() + +# f = open('celery_app/config_load.py','w') +# f.write(json.dumps(db_connections)) +# f.close() + +# print(json.dumps(rules[0])) + +# exit() +# import time +# time.sleep(1) +# a = init_db_connections() +# print("From main" ,a) +# time.sleep(11) + + +async def exec_id(id): + cust = Customer(id=id) + cust.load_tasks(configs=rules ,db_connections=db_connections) + await cust.run() + + for task in cust.executed_tasks: + print(task.task_run.get()) + + +async def main(): + run_id = 'dev_run' + ids = [1,2,3,4,5] + celery_param_init = CeleryParams() + + # celery_param_init.run(db_connections=db_connections) + # a = db_base.delay(config="aaaaaaai") + # + # from celery_app.tasks_2 import init_db_connections,init_db_connections2 + + # b = init_db_connections.delay(init_db='any') + # print(b.get()) + + for id in ids: + await exec_id(id) + + +# cust = Customer(id=1) +# cust.load_tasks(configs=rules ,db_connections=db_connections) +# asyncio.run(cust.run()) +# for task in cust.executed_tasks: +# print(task.task_run.state) + +# if __name__ == "__main__": +# main() + +asyncio.run(main()) + + + + + +#######['TimeoutError', '__class__', '__copy__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_args__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_cache', '_get_task_meta', '_ignored', '_iter_meta', '_maybe_reraise_parent_error', '_maybe_set_cache', '_on_fulfilled', '_parents', '_set_cache', '_to_remote_traceback', 'app', 'args', 'as_list', 'as_tuple', 'backend', 'build_graph', 'children', 'collect', 'date_done', 'failed', 'forget', 'get', 'get_leaf', 'graph', 'id', 'ignored', 'info', 'iterdeps', 'kwargs', 'maybe_reraise', 'maybe_throw', 'name', 'on_ready', 'parent', 'queue', 'ready', 'result', 'retries', 'revoke', 'state', 'status', 'successful', 'supports_native_join', 'task_id', 'then', 'throw', 'traceback', 'wait', 'worker'] \ No newline at end of file diff --git a/adi/oob_celery/context_task.py b/adi/oob_celery/context_task.py new file mode 100644 index 0000000..9c38302 --- /dev/null +++ b/adi/oob_celery/context_task.py @@ -0,0 +1,8 @@ +from typing import Dict +from task import Task + + +def load_task(config:Dict) -> Task: + return Task(config=config) + + diff --git a/adi/oob_celery/customer.py b/adi/oob_celery/customer.py new file mode 100644 index 0000000..3a6fa77 --- /dev/null +++ b/adi/oob_celery/customer.py @@ -0,0 +1,96 @@ + + +from typing import Dict, List, Optional +from states import State +from task import Task + +import asyncio + + + + + +class Customer: + + def __init__(self ,id:int ) -> None: + self.id = id + self.tasks: List["Task"] = [] + self.starting_tasks: List["Task"] = [] + self.executed_tasks: List["Task"] = [] + self.result = None + self.state = State.SCHEDULED + + + + + def load_tasks(self , configs: List[Dict] ,db_connections:Dict): + + self.tasks = [ Task(config=config, db_connections=db_connections) for config in configs ] + self._initialize_customer_tasks() + + async def run(self): + + execute_task = asyncio.create_task(self.task_exec()) + monitor_progresss = asyncio.create_task(self.monitor_progress()) + + await execute_task + # print(len(self.executed_tasks)) + + await monitor_progresss + + # for task in self.starting_tasks: + # await task.run() + + + async def monitor_progress(self): + import time + print("In Monitor") + failure_flag = False + count_success = 0 + + i = 0 + + while True: + + state = self.executed_tasks[i].task_run.state + + if state == 'SUCCESS': + count_success += 1 + print(f'***Main_ID:{self.id}*****\nRule_ID:{self.executed_tasks[i].rule_id} \ + \nCelery_UUID:{self.executed_tasks[i].task_celery_id}\ + \nstatus - {self.executed_tasks[i].task_run.state}') + i += 1 + + if state == 'FAILURE': + print(f'***Main_ID:{self.id}*****\nRule_ID:{self.executed_tasks[i].rule_id} \ + \nCelery_UUID:{self.executed_tasks[i].task_celery_id}\ + \nstatus - {self.executed_tasks[i].task_run.state}') + failure_flag = True + i += 1 + + if i >= len(self.executed_tasks) -1: + if failure_flag: + self.state = State.FAILED + elif count_success == len(self.executed_tasks) : + self.state = State.FINISHED + break; + + + await asyncio.sleep(1) + + + + + async def task_exec(self): + for task in self.starting_tasks: + task.run() + + def find_task(self, task_id: int) -> Optional[Task]: + return next((task for task in self.tasks if task.id == task_id), None) + + def _initialize_customer_tasks(self): + for task in self.tasks: + task.customer = self + """ tasks passed sorted - case more sorting , or running bulk (group1,group2 ...) - to consider""" + self.starting_tasks.append(task) + task.initialize_task() \ No newline at end of file diff --git a/adi/oob_celery/db_config/config.py b/adi/oob_celery/db_config/config.py new file mode 100644 index 0000000..43cf812 --- /dev/null +++ b/adi/oob_celery/db_config/config.py @@ -0,0 +1,121 @@ + +from typing import Dict +from enum import Enum +import sys +from pathlib import Path +sys.path.append(str(Path(__file__).parent.parent)) + + +from sqlalchemy import create_engine + + +class DBType(str, Enum): + POSTGRES = "postgres" + SQLITE = "sqlite" + + + +class PostgresFactory(): + def __init__(self, *args, **kwargs): + + self.db_type = kwargs['DB_TYPE'] + self.name = kwargs['NAME'] + self.user = kwargs['USER'] + self.password = kwargs['PASSWORD'] + self.host = kwargs['HOST'] + self.port = kwargs['PORT'] + self.engine = None + self.postgress_db_string = "postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}".format( + self.user, + self.password, + self.host, + self.port, + self.name ) + + try: + self.engine = create_engine(self.postgress_db_string) + print(f"Connection to the {self.host} for user {self.user} created successfully.") + except Exception as error: + print("Error: Connection not established {}".format(error)) + + # self.Session = sessionmaker(bind=self.engine) + def get_engine(self): + return self.engine.connect() + + + # + # def __enter__(self): + # + # self.connection = create_engine(postgress_db_string) + # return self.connection + # def __exit__(self, exc_type, exc_val, exc_tb): + # if exc_type or exc_tb or exc_tb: + # self.connection.close() + # self.connection.commit() + # self.connection.close() + + def initialize_db(config: Dict): + # note that this can be split into classes or separate methods + # here you can do al preparations, make sure all libraries are imported + # if you want to import some libs only if a given task type is used etc. + pass + + # if config.get('source') == 'csv': + # if not os.path.isfile(config.get('task_params').get('path')): + # raise FileNotExists("File with given path does not exists!") + + def get_db(self): + print("get DB", self.port) + + +class SqlLiteFactory(): + def __init__(self, *args, **kwargs): + self.kwargs = kwargs + @staticmethod + def initialize_db(config: Dict): + pass + + @classmethod + def get_db(self): + print("get DB", self.kwargs) + + # return Task(config=config) + +class DBContext: + available_factories = { + DBType.POSTGRES: PostgresFactory, + DBType.SQLITE: SqlLiteFactory + } + + @staticmethod + def get_db(config: Dict) -> "DbSettings": + db_type = config.get('DB_TYPE') + factory = DBContext.available_factories.get(db_type) + if factory is None: + raise ValueError(f"No factory for task type: {db_type}") + return factory(**config) + + +# # # Test +# db_test = {'DB_TYPE': 'postgres', 'ENGINE': 'postgres', 'NAME': 'dvdrental', 'USER': 'admin', 'PASSWORD': 'admin', 'HOST': '192.168.1.113', 'PORT': 5432} +# test = DBContext.get_db(db_test) + +# sss = test.get_engine() +# print(sss) +# from sqlalchemy import text +# sql = text('SELECT * from customer WHERE customer_id=1') +# results = sss.execute(sql) +# for e in results: +# print(e) + + +# import pandas as pd +# sql = ''' +# SELECT * FROM actor; +# ''' +# with sss.connect().execution_options() as conn: +# query = conn.execute(text(sql)) +# df = pd.DataFrame(query.fetchall()) +# +# print(df.head(1)) +# diff --git a/adi/oob_celery/loader_config.py b/adi/oob_celery/loader_config.py new file mode 100644 index 0000000..238356d --- /dev/null +++ b/adi/oob_celery/loader_config.py @@ -0,0 +1,128 @@ + +import csv +import sys +import json + + + +# from loadCsv.load_manager import LoadManager + +rules = 'application_conig.rules.' + +class LoadConfig: + + def __init__(self , settings) : + + self.settings = settings + self.customers_list = self.settings.get(f'{rules}customers_list') + self.files = self.settings.get(f'{rules}files')[0] + self.files_path = self.settings.get(f'{rules}folder') + self.mapping_rule_file = self.files_path + '/' + self.files + self.load_config:dict = {} + self.operation = None + + self.csv2dict = {} + self.db_connections = [] + self.load_manager = None + + + def __repr__(self): + return json.dumps(self.load_config) + + def run(self): + + # print("Run",json.dumps((self.load_config))) + + # return group([ avi.delay(customer) for customer in self.customers_list]) + #LoadManager().delay(config=(self.load_config)) + return + + + def initialize_operation(self): + self.csv2dict = self._convertcsv2dict(self.mapping_rule_file) + + self.load_config = { 'csvdict' :self.csv2dict, + 'customers_list': self.customers_list } + + + db_connection = {} + + for rule in self.csv2dict: + + if rule is not None: + if rule['rules']['source_type'] == 'db': + # Updating all required db connection + db_name = rule['rules']['source_name'] + db_connection[db_name] = { 'connection_details' : self.settings.get('databases.' + db_name),'engine' : ''} + + + self.db_connections = db_connection + + self.load_config['db_connections'] = db_connection + + + def prepare_celery_config(self): + db_config = self.load_config['db_connections'] + + for db_name , db_details in db_config.items(): + print(db_name ,db_details) + + + def get_db_connections(self): + return self.db_connections + + + @staticmethod + def _convertcsv2dict(file_path): + """ Function will conevert the csv to dict format where each column in csv would be key in the dict + In exampe table_name,connection would be { 'table_name': , 'connection': }""" + + content = [] + rule_id = 1 + with open(file_path) as csvfile: + csv_reader = csv.reader(csvfile) + headers = next(csv_reader) + for row in csv_reader: + row_data = {key: value for key, value in zip(headers, row)} + updated_row = {} + updated_row.update({'rule_id': rule_id, 'rules':row_data}) + content.append(updated_row) + rule_id += 1 + + sorted_mapping_rules = sorted(content, key=lambda d: d['rules']['order']) + + return sorted_mapping_rules + + def load(self): + pass + + #return group([ avi.delay(customer) for customer in self.customers_list]) + + # res = load_csv.delay(files) + # print(res.get()) + + +# load_config.initialize_operation() + +# print(load_config.csv2dict) + +# db_all = {} +# for db_name,db_details in load_config.db_connections.items(): +# print("Here --> \n", db_name ,db_details['connection_details']) +# db_engine = DBContext().get_db(db_details['connection_details']) +# db_all[db_name] = db_engine +# # print("DB connections is",db_engine) +# # load_config.initialize_operation() + + +# if __name__ == "__main__": + + +# settings = {'csvdict': [{'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM customer1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM rental1 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '1'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM customer2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}, {'key': '', 'rules': {'source_type': 'db', 'source_name': 'postgres', 'db_connection_name': 'source_ps', 'sql': 'SELECT * FROM rental2 where customer=&1 ', 'target_type': 'df ', 'db_connection_target': ' file ', 'order': '3'}}], 'customers_list': [1, 2, 3, 4, 5, 6, 7], 'db_connections': {'postgres': {'connection_name': 'source_ps', 'engine': }}} +# # config = settings +# # path = '/csv_files' +# # files = ['source1.csv'] +# x1 = LoadConfig(settings=settings) +# x1.initialize_operation() +# # # x1.set_db_connection() +# # print("here",x1.db_connection) \ No newline at end of file diff --git a/adi/oob_celery/mapping_rules/source1.csv b/adi/oob_celery/mapping_rules/source1.csv new file mode 100644 index 0000000..0ac2445 --- /dev/null +++ b/adi/oob_celery/mapping_rules/source1.csv @@ -0,0 +1,5 @@ +rule_id,source_type,source_name,source_object_name,sql,target_type,target_name,target_object_name,order +1,db,postgres,customer ,SELECT * FROM customer where customer_id=&1,db ,target,customer,1 +2,db,postgres,customer_data,SELECT * FROM customer_data where customer_id=&1 ,db ,target,customer_data,1 +3,db,postgres,customer_binary,SELECT * FROM customer_binary where customer_id=&1 ,db ,target,customer_binary,3 +4,db,postgres,payment,select * from payment where customer_id in (select customer_id from customer where customer_id=&1) , db,target,payment,3 diff --git a/adi/oob_celery/mapping_rules/source1.csv_old b/adi/oob_celery/mapping_rules/source1.csv_old new file mode 100644 index 0000000..868a7a0 --- /dev/null +++ b/adi/oob_celery/mapping_rules/source1.csv_old @@ -0,0 +1,7 @@ +source_type,source_name,db_connection_name,sql,target_name,target_type,db_connection_target,order +db,postgres,source_ps,SELECT * FROM customer1 where customer=&1,customer ,df , file ,1 +db,postgres,source_ps,SELECT * FROM rental1 where customer=&1 ,d,rental , file ,1 +db,postgres,source_ps,SELECT * FROM customer2 where customer=&1 ,df, , file ,3 +db,postgres,source_ps,SELECT * FROM rental2 where customer=&1 ,df, , file ,3 +db,target,source_ps,SELECT * FROM customer2 where customer=&1 ,df, , file ,3 +db,target,source_ps,SELECT * FROM rental2 where customer=&1 ,df, , file ,3 \ No newline at end of file diff --git a/adi/oob_celery/requirements.txt b/adi/oob_celery/requirements.txt new file mode 100644 index 0000000..3f68b68 --- /dev/null +++ b/adi/oob_celery/requirements.txt @@ -0,0 +1,14 @@ +pip +setuptools +wheel +sqlalchemy +SQLAlchemy-Utils +numpy +pandas +python-decouple +pyyaml +psycopg2-binary +celery +gevent +watchdog +ormar diff --git a/app_config/settings.py b/adi/oob_celery/settings.py similarity index 69% rename from app_config/settings.py rename to adi/oob_celery/settings.py index 50e16eb..c32d8f5 100644 --- a/app_config/settings.py +++ b/adi/oob_celery/settings.py @@ -1,7 +1,7 @@ import yaml from functools import reduce import operator -from pathlib import Path + class SingletonMeta(type): @@ -27,18 +27,26 @@ def __init__(self, *args, **kwargs): except yaml.YAMLError as exc: print(exc) + def get(self, element): return reduce(operator.getitem, element.split('.'), self.settings) -if __name__ == "__main__": - # The client code. - config_file = Path('.', 'config.yaml') - s1 = Settings(config_file=config_file) - print(s1.get('databases.mongo.ENGINE')) +## adding sys.path.append(str(Path(__file__).parent.parent)) - will include the parent dir so can work directly +# or from main + +# s1 = Settings(config_file='config.yaml') +# print(s1.get('databases.mongo.ENGINE')) + +# if __name__ == "__main__": +# # The client code. +# config_file = Path('.', 'config.yaml') +# s1 = Settings(config_file=config_file) + + # print(s1.get('databases.mongo.ENGINE')) # if id(s1) == id(s2): # print("Singleton works, both variables contain the same instance.") # else: diff --git a/adi/oob_celery/states.py b/adi/oob_celery/states.py new file mode 100644 index 0000000..af680dd --- /dev/null +++ b/adi/oob_celery/states.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class State(str, Enum): + SCHEDULED = "Scheduled" + RUNNING = "Running" + FINISHED = "Finished" + CANCELLED = "Cancelled" + FAILED = "Failed" \ No newline at end of file diff --git a/adi/oob_celery/task.py b/adi/oob_celery/task.py new file mode 100644 index 0000000..5268d6a --- /dev/null +++ b/adi/oob_celery/task.py @@ -0,0 +1,38 @@ +from typing import Dict, List +from states import State +from celery_app.tasks_2 import proccess_rule + +class Task: +#rule_id,source_type,source_name,sql,target_type,target_name,order + def __init__(self , config:Dict, db_connections:Dict ) : + + self.customer = None + self.rule_id = config.get('rule_id') + self.source_type = config['rules'].get('source_type') + self.source_name = config['rules'].get('source_name') + self.source_object_name = config['rules'].get('source_object_name') + self.sql = config['rules'].get('sql') + self.sql_render = None + self.target_type = config['rules'].get('target_type') + self.target_object_name = config['rules'].get('target_object_name') + self.target_name= config['rules'].get('target_name') + self.order = config['rules'].get('order') + self.state = State.SCHEDULED + self.result = None + self.task_run = None + self.task_celery_id = None + + def initialize_task(self): + self.sql_render = self.sql.replace("&1",str(self.customer.id)).rstrip('\r\n') + + def run(self): + self.task_run = proccess_rule.delay(rule_id=self.rule_id, main_id=self.customer.id,source_type=self.source_type,source_name=self.source_name,source_object_name=self.source_object_name,sql=self.sql_render, + target_type=self.target_type,target_object_name=self.target_object_name,target_name=self.target_name, order=self.order) + self.task_celery_id = self.task_run.task_id + + self._update_customer() + + + def _update_customer(self): + self.customer.executed_tasks.append(self) + diff --git a/rules/source_1.csv b/adi/rules/source_1.csv similarity index 100% rename from rules/source_1.csv rename to adi/rules/source_1.csv diff --git a/adi/ttt/tasks.py b/adi/ttt/tasks.py new file mode 100644 index 0000000..f61514c --- /dev/null +++ b/adi/ttt/tasks.py @@ -0,0 +1,7 @@ +from celery import Celery + +app = Celery('tasks', broker='amqp://guest:guest@localhost/%2f') + +@app.task +def add(x, y): + return x + y \ No newline at end of file diff --git a/rabbitmq/celery_test/proj/__pycache__/__init__.cpython-310.pyc b/rabbitmq/celery_test/proj/__pycache__/__init__.cpython-310.pyc index 3ce5695..a734b43 100644 Binary files a/rabbitmq/celery_test/proj/__pycache__/__init__.cpython-310.pyc and b/rabbitmq/celery_test/proj/__pycache__/__init__.cpython-310.pyc differ diff --git a/rabbitmq/celery_test/proj/__pycache__/celery.cpython-310.pyc b/rabbitmq/celery_test/proj/__pycache__/celery.cpython-310.pyc index 4fc7e53..563ff73 100644 Binary files a/rabbitmq/celery_test/proj/__pycache__/celery.cpython-310.pyc and b/rabbitmq/celery_test/proj/__pycache__/celery.cpython-310.pyc differ diff --git a/rabbitmq/celery_test/proj/__pycache__/tasks.cpython-310.pyc b/rabbitmq/celery_test/proj/__pycache__/tasks.cpython-310.pyc index ab5aee7..f4676b5 100644 Binary files a/rabbitmq/celery_test/proj/__pycache__/tasks.cpython-310.pyc and b/rabbitmq/celery_test/proj/__pycache__/tasks.cpython-310.pyc differ diff --git a/requirements.txt b/requirements.txt index ee08c5e..40e0440 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,12 @@ -pip==22.3.1 -setuptools==65.5.1 -wheel==0.38.4 +pip +setuptools +wheel sqlalchemy SQLAlchemy-Utils numpy pandas python-decouple pyyaml -psycopg2 \ No newline at end of file +psycopg2-binary +celery +watchdog diff --git a/test.py b/test.py new file mode 100644 index 0000000..c96b2d1 --- /dev/null +++ b/test.py @@ -0,0 +1,15 @@ +from databases import Database +import asyncio + +async def initalize_connection(): + database = Database('postgresql://username:password@host:5432/database') + try: + await database.connect() + print('Connected to Database') + await database.disconnect() + print('Disconnecting from Database') + except : + print('Connection to Database Failed') + +if __name__ == '__main__': + asyncio.run(initalize_connection())