Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-tool/.corps.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ VERIFY_SUMMARY_PATH=results.csv
# colin freeze script
FREEZE_BATCHES=1
FREEZE_BATCH_SIZE=300
FREEZE_ORACLE_CHUNK_SIZE=1000

FREEZE_COLIN_CORPS=True
FREEZE_ADD_EARLY_ADOPTER=True
Expand Down
102 changes: 65 additions & 37 deletions data-tool/flows/colin_freeze_flow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import math
from prefect import flow, task
from common.init_utils import colin_extract_init, colin_oracle_init, get_config
from common.colin_utils import colin_oracle_chunks, colin_oracle_corp_num_list_format
from sqlalchemy import Engine, text
from prefect.task_runners import ConcurrentTaskRunner

from prefect.cache_policies import NO_CACHE
from prefect.context import get_run_context
Expand All @@ -11,12 +13,13 @@
import ExtractTrackingService as ColinTrackingService, ProcessingStatuses

FLOW_NAME = 'colin-freeze-flow'

ORACLE_IN_LIMIT = 1000
DEFAULT_ORACLE_CHUNK_SIZE = 1000

colin_freeze_query = """
UPDATE corporation c
SET corp_frozen_typ_cd = 'C'
WHERE c.corp_num = :corp_num
WHERE c.corp_num IN {corp_nums}
"""

colin_add_early_adopters_query = """
Expand Down Expand Up @@ -131,41 +134,66 @@


@task(cache_policy=NO_CACHE)
def update_colin_oracle(config, colin_oracle_engine: Engine, corp_num: str):
def update_colin_oracle(config, colin_oracle_engine: Engine, corp_nums: list[str]):
if not corp_nums:
return []

if len(corp_nums) > ORACLE_IN_LIMIT:
error = ValueError(f'Chunk size {len(corp_nums)} exceeds ORACLE_IN_LIMIT {ORACLE_IN_LIMIT}')
return [(corp_num, False, False, error) for corp_num in corp_nums]

if not config.FREEZE_COLIN_CORPS and not config.FREEZE_ADD_EARLY_ADOPTER:
return [(corp_num, False, False, None) for corp_num in corp_nums]

colin_corp_num_list = [convert_to_colin_format(corp_num) for corp_num in corp_nums]
with colin_oracle_engine.connect() as conn:
transaction = conn.begin()
try:
res1, res2 = None, None
colin_corp_num = convert_to_colin_format(corp_num)
if config.FREEZE_COLIN_CORPS:
res1 = conn.execute(
text(colin_freeze_query),
{'corp_num': colin_corp_num}
)
frozen_colin_nums = set()
if config.FREEZE_COLIN_CORPS and colin_corp_num_list:
conn.execute(
text(colin_freeze_query.format(corp_nums=colin_oracle_corp_num_list_format(colin_corp_num_list)))
)
frozen_colin_nums = set(colin_corp_num_list)

if config.FREEZE_ADD_EARLY_ADOPTER:
res2 = conn.execute(
conn.execute(
text(colin_add_early_adopters_query),
{'corp_num': colin_corp_num}
[{'corp_num': corp_num} for corp_num in colin_corp_num_list]
)
frozen = res1.rowcount > 0 if res1 else False
in_early_adopter = res2.rowcount > 0 if res2 else False
transaction.commit()
return corp_num, frozen, in_early_adopter, None
except Exception as e:
transaction.rollback()
print(f'❌ Error updating {corp_num} in colin: {repr(e)}')
return corp_num, False, False, e

print(f'❌ Chunk statement error for {len(corp_nums)} corps ({corp_nums[:5]}): {repr(e)}')
return [
(
corp_num,
False,
False,
e,
)
for corp_num in corp_nums
]
results = []
for original_corp_num, colin_corp_num in zip(corp_nums, colin_corp_num_list):
frozen = colin_corp_num in frozen_colin_nums if config.FREEZE_COLIN_CORPS else False
in_early_adopter = config.FREEZE_ADD_EARLY_ADOPTER
results.append((original_corp_num, frozen, in_early_adopter, None))
return results

@flow(
name='Colin-Freeze-Flow',
task_runner=ConcurrentTaskRunner(max_workers=10),
log_prints=True,
)
def colin_freeze_flow():

Check failure on line 189 in data-tool/flows/colin_freeze_flow.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=bcgov_lear&issues=AZ301-Jy87P6yfrIkQvL&open=AZ301-Jy87P6yfrIkQvL&pullRequest=4355
try:
config = get_config()
colin_oracle_engine = colin_oracle_init(config)
colin_extract_engine = colin_extract_init(config)
oracle_chunk_size = getattr(config, 'FREEZE_ORACLE_CHUNK_SIZE', DEFAULT_ORACLE_CHUNK_SIZE)
if oracle_chunk_size < 1 or oracle_chunk_size > ORACLE_IN_LIMIT:
raise ValueError(f'FREEZE_ORACLE_CHUNK_SIZE must be between 1 and {ORACLE_IN_LIMIT}')

total = get_incomplete_count(config, colin_extract_engine)
print(f'👷 Statistics: {total} incomplete corps (unprocessed or failed)')
Expand Down Expand Up @@ -205,35 +233,35 @@
print(f'👷 Start processing {len(corp_nums)} corps: {", ".join(corp_nums[:5])}...')

futures = []
for corp_num in corp_nums:
for corp_chunk in colin_oracle_chunks(corp_nums, oracle_chunk_size):
futures.append(
update_colin_oracle.submit(
config, colin_oracle_engine, corp_num)
config, colin_oracle_engine, corp_chunk)
)
complete = 0
failed = 0
for f in futures:
corp_num, frozen, in_early_adopter, error = f.result()
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
else:
complete += 1
colin_tracking_service.update_corp_status(
for corp_num, frozen, in_early_adopter, error in f.result():
if error:
failed += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
ProcessingStatuses.FAILED,
repr(error),
frozen=frozen,
in_early_adopter=in_early_adopter
)
)
else:
complete += 1
colin_tracking_service.update_corp_status(
flow_run_id,
corp_num,
ProcessingStatuses.COMPLETED,
error=None,
frozen=frozen,
in_early_adopter=in_early_adopter
)

total_failed += failed
cnt += 1
Expand Down
7 changes: 7 additions & 0 deletions data-tool/flows/common/colin_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def colin_oracle_chunks(values: list[str], size: int) -> list[list[str]]:
return [values[i:i + size] for i in range(0, len(values), size)]

def colin_oracle_corp_num_list_format(corp_nums: list[str]) -> str:
def q(s: str) -> str:
return "'" + str(s).replace("'","''") + "'"
return '(' + ','.join(q(c) for c in corp_nums) + ')'
1 change: 1 addition & 0 deletions data-tool/flows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class _Config(): # pylint: disable=too-few-public-methods
# freeze flow
FREEZE_BATCHES = _get_int('FREEZE_BATCHES', 0)
FREEZE_BATCH_SIZE = _get_int('FREEZE_BATCH_SIZE', 0)
FREEZE_ORACLE_CHUNK_SIZE = _get_int('FREEZE_ORACLE_CHUNK_SIZE', 1000)

# ORACLE COLIN DB
DB_USER_COLIN_ORACLE = os.getenv('DATABASE_USERNAME_COLIN_ORACLE', '')
Expand Down