diff --git a/docs/pipelineconfiguration.rst b/docs/pipelineconfiguration.rst index 83a6f4ec..56555215 100644 --- a/docs/pipelineconfiguration.rst +++ b/docs/pipelineconfiguration.rst @@ -175,37 +175,6 @@ without provided arguments the default values from ``stixcore.ini`` are used 0 5 * * * cd /home/stixcore/STIXCore/ && /home/stixcore/STIXCore/venv/bin/python /home/stixcore/STIXCore/stixcore/processing/publish.py --update_rid_lut -Run the pipeline monitor -************************ - -The event based pipeline (observing incoming telemetry files) gets stuck from time to time. There is a process observing the number of open to process files. If the number of open files is constantly increasing over a longer period a notification mail is send out: - -.. code-block:: - - # run pipeline monitor task to check for pipeline not stuck - 0 */3 * * * cd /home/stixcore/STIXCore/ && /home/stixcore/STIXCore/venv/bin/python /home/stixcore/STIXCore/stixcore/processing/pipeline_monitor.py -s /home/stixcore/monitor_status.json - - -In case of a pipeline stuck restart the event based processing pipeline. - -.. code-block:: - - # stop the system.d process - sudo systemctl stop stix-pipeline.service - - # wait 20sec so that all open sockets also gets closed - # start the process again - - sudo systemctl start stix-pipeline.service - -In order to process all tm files that have not been processed so fare the config parameter start_with_unprocessed should be set to true: - -.. code-block:: - - [Pipeline] - start_with_unprocessed = True - - Run the 'daily' pipeline ************************ diff --git a/stixcore/__init__.py b/stixcore/__init__.py index 3b9e5cb6..cb4e8b88 100644 --- a/stixcore/__init__.py +++ b/stixcore/__init__.py @@ -3,4 +3,9 @@ from .version import version as __version__ from .version_conf import __version_conf__ +try: + from .version_conf import __version_conf__ +except ImportError: + __version_conf__ = "unknown" + logger = get_logger(__name__) diff --git a/stixcore/calibration/ecc_post_fit.py b/stixcore/calibration/ecc_post_fit.py new file mode 100644 index 00000000..2bdc5991 --- /dev/null +++ b/stixcore/calibration/ecc_post_fit.py @@ -0,0 +1,228 @@ +""" + Author: O. Limousin, CEA + Date: Oct 23, 2024 + This script to: + + #%%%%%%%%%%%%%% FOR fit of ECC_ULTRA_FINE %%%%%%%%%%%%%% + ../00_CALIBRATION_MONITORING_ULTRA_FINE/02_MONITOR_ECC_SPECTRA ==> XX_erg.fits + + ../00_CALIBRATION_MONITORING_ULTRA_FINE/01_MONITOR_ECC_PARA/ ==> ECC_para_XXXX.fits + + + read the log (.xlsx file) + + open the ECC calibrated spectra XX_erg.fits + + fit 31 and 81 keV lines (Fit_Ba_robust) (fit right hand side, and baseline) + + register the results in a Pandas Dataframe + + date, Run Number, Fit Goodness flags,DET, PIX, + P31, err_P31, dE31, err_dE31, H31 + P81, err_P81, dE81, err_dE81, H81 + + compute gain/offset corrections + + fill the dataframe with new values + + include errors + + include a Flag (Flag31 and Flag81) which is True if fit was OK + if was not OK, the ECC value in NOT corrected + + store the results in pkl + + generate update ECC_para files to recalibrate uncalibrated files + +""" + +import numpy as np +import pandas as pd +from lmfit import Model + +from astropy.io import fits + +from stixcore.util.logging import get_logger + +logger = get_logger(__name__) + + +def open_fits_tables(fits_path): + # Get the data from .fits + data = [] + with fits.open(fits_path, memmap=False) as hdul: + header = [hdul[0].header] + for i in range(1, len(hdul)): + header.append(hdul[i].header) + data.append(hdul[i].data) + + # Get the fields of each data table and sort the data in lists + data_names = [data_i.columns.names for data_i in data] + data_format = [data_i.columns.formats for data_i in data] + data_unit = [data_i.columns.units for data_i in data] + data_list = [[list(data[i][data_names[i][j]]) for j in range(len(data_names[i]))] for i in + range(len(data))] + return header, data, data_list, data_names, data_format, data_unit + + +def Read_fits_STIX_One_Pixel(data, PIX=0, DETECTOR_ID=1, Nbin=2024, NRebin=1, NSigma=1): + data = data[0] + Pix = PIX + Pix = Pix + (DETECTOR_ID-1)*12 + obs = [0] * Nbin + erg_c = data.ERG_center # data.field(0) + obs = data.field(3+Pix) + nbin = Nbin + Nbin = [Nbin/NRebin] + # TODO: check if we really need to rebin with congrid here + # erg_c = congrid(erg_c[:nbin], Nbin) + # obs = congrid(obs[:nbin], Nbin) + erg_c = erg_c[:nbin] + obs = obs[:nbin] + + yerr = NSigma*np.sqrt(obs) + return erg_c, obs, yerr + + +def line(x, slope, intercept): + """a line""" + return (slope*x + intercept) + + +def poly(x, degree, slope, intercept): + """a line""" + return (degree*x*x + slope*x + intercept) + + +def gaussian(x, amp, cen, wid): + # """1-d gaussian: gaussian(x, amp, cen, wid)""" + return (amp*np.exp(-(x-cen)**2 / (2*wid**2))) + + +def Fit_Ba_Lines_Robust(erg_c, obs, PLOT_VERBOSE=1, LogScale=1): + """ + OL, oct 22, 2024 + Robust fit procedure to adjust peak position post ECC + The idea is the exact same as previous function but to return + + 0's in case the fit fails + + flag to say if the fit was successful or not + """ + # Select Energy range for 81 keV Ba-133 line + pipo = ((erg_c > 80.5) & (erg_c < 90.)).nonzero() + y = (obs[pipo]) + x = (erg_c[pipo]) + x = np.array(x, dtype='float64') + y = np.array(y, dtype='float64') + mod = Model(gaussian) + Model(line) + pars = mod.make_params(amp=10, cen=81, wid=0.5, slope=0, intercept=0) + result = mod.fit(y, pars, x=x) + + if ((result.params['wid'].stderr is not None) & + (result.params['cen'].stderr is not None) & + (np.abs(result.best_values['cen'] - 81.) < 1.)): + dE81 = result.best_values['wid']*2.35 + err_dE81 = result.params['wid'].stderr*2.35 + P81 = result.best_values['cen'] + err_P81 = result.params['cen'].stderr + H81 = result.best_values['amp'] + Goodness_Flag_81 = True + else: + dE81, err_dE81, P81, err_P81, H81 = 0, 0, 0, 0, 0 + Goodness_Flag_81 = False + + # Select Energy range for 81 keV Ba-133 line + # THE FOLLWING SECTION IS QUIET ROBUST BUT MIGHT OVERESTIMATE THE ENERGY RESOLUTION AT 32 keV + # THIS ALLOWS TO FORCE THE BASE LINE TO ADJSUT in 40-45 keV range while a simple Gaussian + # is used to fit the right hand side of the 32 keV Line + pipo = (((erg_c > 30.2) & (erg_c < 33.0)) | ((erg_c > 40) & (erg_c < 45))).nonzero() + y = obs[pipo] + x = erg_c[pipo] + x = np.array(x, dtype='float64') + y = np.array(y, dtype='float64') + + mod = Model(gaussian, prefix='g1_') + Model(poly) + pars = mod.make_params(g1_amp=10, g1_cen=30.6, g1_wid=0.4, degree=0., slope=0, intercept=0.) + + result = mod.fit(y, pars, x=x) + + if ((result.params['g1_wid'].stderr is not None) & + (result.params['g1_cen'].stderr is not None) & + ((np.abs(result.best_values['g1_cen'] - 31.) < 1.) & (Goodness_Flag_81))): + dE31 = result.best_values['g1_wid']*2.35 + err_dE31 = result.params['g1_wid'].stderr*2.35 + P31 = result.best_values['g1_cen'] + err_P31 = result.params['g1_cen'].stderr + H31 = result.best_values['g1_amp'] + Goodness_Flag_31 = True + else: + dE31, err_dE31, P31, err_P31, H31 = 0, 0, 0, 0, 0 + Goodness_Flag_31 = False + + return P31, P81, dE31, dE81, err_P31, err_P81, err_dE31, err_dE81, \ + H31, H81, Goodness_Flag_31, Goodness_Flag_81 + + +def ecc_post_fit(erg_file, para_file, livetime): + + DETs = np.arange(32)+1 + LARGEs = [0, 1, 2, 3, 4, 5, 6, 7] + SMALLs = [8, 9, 10, 11] + PIXELs = LARGEs+SMALLs + + # Prep Pandas DataFrame + df = pd.DataFrame() # dict will be define later during concactenation with appropriated keys + accumulator = [] + + # Proceed to fit individually each pixel spectrum in the list of files + data = open_fits_tables(erg_file)[1] # only need data + for DET in DETs: + for PIX in PIXELs: # or in LARGEs, SMALLs + erg_c, obs, yerr = Read_fits_STIX_One_Pixel(data, PIX=PIX, DETECTOR_ID=DET, + Nbin=2024, NRebin=1, NSigma=1) + + P31, P81, dE31, dE81, err_P31, err_P81, err_dE31, err_dE81, H31, H81, Goodness_Flag31, \ + Goodness_Flag81 = Fit_Ba_Lines_Robust(erg_c, obs/livetime, + PLOT_VERBOSE=0, LogScale=0) + + dict = {'DET': [DET], + 'PIX': [PIX], + 'P31': [P31], + 'err_P31': [err_P31], + 'dE31': [dE31], + 'err_dE31': [err_dE31], + 'Flag31': [Goodness_Flag31], + 'P81': [P81], + 'err_P81': [err_P81], + 'dE81': [dE81], + 'err_dE81': [err_dE81], + 'Flag81': [Goodness_Flag81]} + logger.debug(dict) + + # NB: this is faster to append a list of dict and create DataFrame at the end + # than concatening the DataFrame row by row, this slows down dramatically progressively + accumulator.append(pd.DataFrame(dict)) + + df = pd.concat(accumulator) + df = df.reset_index(drop=True) + + # 3- gain and offset correction factors of ECC pre-calibrated data + G_prime = (df['P81']-df['P31'])/(80.9979-(30.6254*33.8 + 30.9731 * 62.4)/(62.4+33.8)) + O_prime = df['P31'] - G_prime*(30.6254*33.8 + 30.9731 * 62.4)/(62.4+33.8) + + # 4- add correction factors to the DataFrame + df['Gain_Prime'] = G_prime + df['Offset_Prime'] = O_prime + + # check, Run number and pixel number prior to assign Gain and Offset for further correction + data_para = open_fits_tables(para_file)[1] # only need data + + df['Gain_ECC'] = np.float32(data_para[0].gain) + df['Offset_ECC'] = np.float32(data_para[0].off) + df['goc'] = np.float32(data_para[0].goc) + + # 7 - Compute corrected Gain and Offset and fill df + # 7.2 - Now assign the corrected Gain and Offset values + # except when ECC works better + df['Gain_Cor'] = 0.0 + df['Offset_Cor'] = 0.0 + + # apply correction to gain and offset when fit is ok + idx = df['Flag31'] & df['Flag81'] + df.loc[idx, 'Gain_Cor'] = df['Gain_ECC'][idx] * df['Gain_Prime'][idx] + df.loc[idx, 'Offset_Cor'] = df['Gain_ECC'][idx] * df['Offset_Prime'][idx] +\ + df['Offset_ECC'][idx] + # otherwise keep uncorrected ECC Values + idx = ~idx + df.loc[idx, 'Gain_Cor'] = df['Gain_ECC'][idx] + df.loc[idx, 'Offset_Cor'] = df['Offset_ECC'][idx] + + return df, idx diff --git a/stixcore/calibration/elut_manager.py b/stixcore/calibration/elut_manager.py new file mode 100644 index 00000000..1f50b6de --- /dev/null +++ b/stixcore/calibration/elut_manager.py @@ -0,0 +1,179 @@ +import sys +from pathlib import Path + +from stixpy.calibration.detector import get_sci_channels +from stixpy.io.readers import read_elut, read_elut_index + +from astropy.table import QTable + +from stixcore.data.test import test_data +from stixcore.util.logging import get_logger +from stixcore.util.singleton import Singleton + +__all__ = ['ELUTManager'] + +ELUT_DATA_DIR = Path(__file__).parent.parent / "config" / "data" / "common" / "elut" + +logger = get_logger(__name__) + + +class ELUTManager(metaclass=Singleton): + """Manages ELUT (Energy Look-Up Table) data and provides date-based access to ELUT tables.""" + + def __init__(self, data_root=None): + """Create the manager for ELUT data. + + Parameters + ---------- + data_root : `str` | `pathlib.Path`, optional + Path to the directory with ELUT data. If None, uses default path. + """ + self.elut_cache = {} + if data_root is None: + data_root = ELUT_DATA_DIR + self.data_root = Path(data_root) + self.elut_index_file = self.data_root / "elut_index.csv" + self._load_index() + + def _load_index(self): + """Load the ELUT index file and ensure it's ordered by start_date.""" + try: + self.elut_index = read_elut_index(self.elut_index_file) + logger.info(f"Loaded {len(self.elut_index)} ELUT entries from index") + + except FileNotFoundError: + logger.warning(f'No ELUT index found at: {self.elut_index_file}') + self.elut_index = [] + except Exception as e: + logger.error(f'Error loading ELUT index: {e}') + self.elut_index = [] + + def _find_elut_file(self, date): + """Find the appropriate ELUT file for a given date using binary search. + + Parameters + ---------- + date : `datetime` + The date for which to find the ELUT + + Returns + ------- + `str` or `None` + The filename of the appropriate ELUT file, or None if not found + """ + if not self.elut_index: + logger.warning("No ELUT index loaded") + return None + elut_info = self.elut_index.at(date) + if len(elut_info) == 0: + raise ValueError(f"No ELUT for for date {date}") + elif len(elut_info) > 1: + raise ValueError(f"Multiple ELUTs for for date {date}") + start_date, end_date, elut_file = list(elut_info)[0] + + return elut_file + + def read_elut(self, elut_file, sci_channels): + """Read an ELUT file and return as astropy QTable. + + Parameters + ---------- + elut_file : `str` + The filename of the ELUT file to read + + Returns + ------- + `stixpy.io.readers.ELUT` + The ELUT data with appropriate units + + Raises + ------ + FileNotFoundError + If the ELUT file doesn't exist + """ + elut_path = self.data_root / elut_file + + if not elut_path.exists(): + raise FileNotFoundError(f"ELUT file not found: {elut_path}") + + try: + elut_table = read_elut(elut_path, sci_channels) + + logger.info(f"Successfully read ELUT file: {elut_file}") + return elut_table + + except Exception as e: + logger.error(f"Error reading ELUT file {elut_file}: {e}") + raise + + def get_elut(self, date) -> tuple[object, QTable]: + """Get the ELUT table for a given date. + + Parameters + ---------- + date : `datetime` + The date for which to get the ELUT + + Returns + ------- + `tuple` + A tuple containing: + `stixpy.io.readers.ELUT` The ELUT data + `QTable` The science channel definition + + Raises + ------ + ValueError + If no ELUT is found for the given date + FileNotFoundError + If the ELUT file doesn't exist + """ + # Find the appropriate ELUT file + elut_file = self._find_elut_file(date) + if not elut_file: + raise ValueError(f"No ELUT found for date: {date}") + + # Check cache first + if elut_file in self.elut_cache: + logger.debug(f"Using cached ELUT: {elut_file}") + return self.elut_cache[elut_file] + + # Read ELUT and cache it + sci_channels = get_sci_channels(date) + elut_table = self.read_elut(elut_file, sci_channels) + self.elut_cache[elut_file] = (elut_table, sci_channels) + + return self.elut_cache[elut_file] + + def get_available_eluts(self): + """Get list of all available ELUT files with their date ranges. + + Returns + ------- + `list` + List of dictionaries containing ELUT information + """ + return self.elut_index + + def clear_cache(self): + """Clear the ELUT cache.""" + self.elut_cache.clear() + logger.info("ELUT cache cleared") + + @property + def cache_size(self): + """Get the current size of the ELUT cache. + + Returns + ------- + `int` + Number of cached ELUT tables + """ + return len(self.elut_cache) + + +# Create singleton instance +if 'pytest' in sys.modules: + ELUTManager.instance = ELUTManager(test_data.elut if hasattr(test_data, 'elut') else None) +else: + ELUTManager.instance = ELUTManager() diff --git a/stixcore/calibration/tests/test_elut_manager.py b/stixcore/calibration/tests/test_elut_manager.py new file mode 100644 index 00000000..188f090f --- /dev/null +++ b/stixcore/calibration/tests/test_elut_manager.py @@ -0,0 +1,189 @@ +from pathlib import Path +from datetime import datetime + +import numpy as np +import pytest +from intervaltree import IntervalTree +from stixpy.calibration.detector import get_sci_channels +from stixpy.io.readers import read_elut + +from astropy.table import QTable + +from stixcore.calibration.elut_manager import ELUTManager + + +class TestELUTManagerBasics: + """Basic integration tests for ELUTManager using real data.""" + + def setup_method(self): + """Set up test fixtures before each test method.""" + # Reset singleton instance for each test + ELUTManager._instances = {} + + def test_initialization_default_path(self): + """Test ELUTManager initialization with default data path.""" + manager = ELUTManager() + + assert manager.data_root.name == "elut" + assert manager.data_root.exists() + assert manager.elut_index_file.exists() + assert isinstance(manager.elut_index, IntervalTree) + assert len(manager.elut_index) > 0 + + def test_elut_index_loading(self): + """Test that ELUT index is loaded correctly.""" + manager = ELUTManager() + + assert hasattr(manager, 'elut_index') + assert len(manager.elut_index) > 0 + + # Check that index contains expected structure + # Each entry should have start_date, end_date, elut_file + start, end, elut = list(manager.elut_index.at(manager.elut_index.begin()))[0] + assert isinstance(start, datetime) + assert isinstance(end, datetime) + assert isinstance(elut, Path) + + def test_get_available_eluts(self): + """Test getting list of available ELUTs.""" + manager = ELUTManager() + + available_eluts = manager.get_available_eluts() + + assert isinstance(available_eluts, IntervalTree) + assert len(available_eluts) > 0 + + def test_find_elut_file_known_date(self): + """Test finding ELUT file for a known date.""" + manager = ELUTManager() + + # Use a date that should be covered by existing ELUTs + test_date = datetime(2022, 8, 1) + + elut_file = manager._find_elut_file(test_date) + + assert elut_file is not None + assert isinstance(elut_file, Path) + assert elut_file.suffix == '.csv' + + # Verify the file exists + elut_path = manager.data_root / elut_file + assert elut_path.exists() + + def test_get_elut_known_date(self): + """Test getting ELUT for a known date.""" + manager = ELUTManager() + + # Use a date that should be covered by existing ELUTs + test_date = datetime(2022, 8, 1) + + elut_table, sci_channels = manager.get_elut(test_date) + + assert elut_table is not None + assert sci_channels is not None + + # Basic validation of returned objects + assert isinstance(elut_table, object) + assert isinstance(sci_channels, QTable) + + def test_cache_functionality(self): + """Test that caching works correctly.""" + manager = ELUTManager() + + test_date = datetime(2022, 8, 1) + + # First call should populate cache + initial_cache_size = manager.cache_size + result1 = manager.get_elut(test_date) + assert manager.cache_size == initial_cache_size + 1 + + # Second call should use cache + result2 = manager.get_elut(test_date) + assert manager.cache_size == initial_cache_size + 1 + + # Results should be identical (cached) + assert result1 is result2 + + def test_clear_cache(self): + """Test cache clearing functionality.""" + manager = ELUTManager() + + # Load an ELUT to populate cache + test_date = datetime(2022, 8, 1) + manager.get_elut(test_date) + + assert manager.cache_size > 0 + + # Clear cache + manager.clear_cache() + + assert manager.cache_size == 0 + + def test_cache_size_property(self): + """Test cache size property.""" + manager = ELUTManager() + + initial_size = manager.cache_size + assert isinstance(initial_size, int) + assert initial_size >= 0 + + # Load an ELUT + test_date = datetime(2022, 8, 1) + manager.get_elut(test_date) + + assert manager.cache_size == initial_size + 1 + + def test_different_dates_different_eluts(self): + """Test that different dates can return different ELUTs.""" + manager = ELUTManager() + + # Use dates that should map to different ELUT files + date1 = datetime(2021, 6, 1) # Should use one ELUT + date2 = datetime(2024, 8, 1) # Should use different ELUT + + elut_file1 = manager._find_elut_file(date1) + elut_file2 = manager._find_elut_file(date2) + + assert elut_file1 is not None + assert elut_file2 is not None + assert elut_file1 != elut_file2 + + def test_read_elut_directly(self): + """Test reading ELUT file directly.""" + manager = ELUTManager() + + date1 = datetime(2021, 6, 1) # Should use one ELUT + + sci_channels_o = get_sci_channels(date1) + elut_file1 = manager._find_elut_file(date1) + elut_table_o = read_elut(elut_file1, sci_channels_o) + + elut_table_m, sci_channels_m = manager.get_elut(date1) + # Ensure it can be retrieved via manager + assert np.all(elut_table_o.e_actual == elut_table_m.e_actual) + assert np.all(sci_channels_o == sci_channels_m) + + def test_error_handling_invalid_date(self): + """Test error handling for dates outside available range.""" + manager = ELUTManager() + + # Use a date far in the past that shouldn't have ELUT data + very_old_date = datetime(1990, 1, 1) + + with pytest.raises(ValueError, match="No ELUT"): + manager.get_elut(very_old_date) + + def test_error_handling_nonexistent_file(self): + """Test error handling when trying to read non-existent ELUT file.""" + manager = ELUTManager() + + from stixpy.calibration.detector import get_sci_channels + sci_channels = get_sci_channels(datetime(2022, 8, 1)) + + with pytest.raises(FileNotFoundError, match="ELUT file not found"): + manager.read_elut("nonexistent_file.csv", sci_channels) + + def test_instance_attribute_exists(self): + """Test that singleton instance is accessible.""" + assert hasattr(ELUTManager, 'instance') + assert isinstance(ELUTManager.instance, ELUTManager) diff --git a/stixcore/data/stixcore.ini b/stixcore/data/stixcore.ini index 38aba050..cdc8b347 100644 --- a/stixcore/data/stixcore.ini +++ b/stixcore/data/stixcore.ini @@ -32,5 +32,7 @@ endpoint = https://solarorbiter.esac.esa.int/soopkitchen/api user = smaloney password = set_in_user_ini soop_files_download = ./stixcore/data/soop +[ECC] +ecc_path = /opt/stix_det_cal/bin/ [Processing] flarelist_sdc_min_count = 1000 diff --git a/stixcore/data/test.py b/stixcore/data/test.py index 82d6bcff..3ef4999f 100644 --- a/stixcore/data/test.py +++ b/stixcore/data/test.py @@ -84,6 +84,7 @@ def __init__(self, data_dir): self.io = IOTestData(data_dir) self.soop = SOOPTestData(data_dir) self.rid_lut = RidLutTestData(data_dir) + self.ecc = data_dir / "ecc" self.__doc__ = "\n".join([f"{k}\n******************\n\n{v.__doc__}\n\n\n" for k, v in self.__dict__.items()]) diff --git a/stixcore/data/test/ecc/ecc_cfg_1/test.txt b/stixcore/data/test/ecc/ecc_cfg_1/test.txt new file mode 100644 index 00000000..e69de29b diff --git a/stixcore/data/test/ecc/ecc_conf_index.json b/stixcore/data/test/ecc/ecc_conf_index.json new file mode 100644 index 00000000..d269699a --- /dev/null +++ b/stixcore/data/test/ecc/ecc_conf_index.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e69980269bb0734d0e7885221e2dba39faeeb245574cdd01c6b31510d8ff4de9 +size 246 diff --git a/stixcore/ecc/manager.py b/stixcore/ecc/manager.py new file mode 100644 index 00000000..36274a14 --- /dev/null +++ b/stixcore/ecc/manager.py @@ -0,0 +1,276 @@ +import sys +import json +import shutil +import tempfile +from types import SimpleNamespace +from pathlib import Path +from datetime import datetime +from contextlib import contextmanager +from configparser import ConfigParser + +from stixcore.data.test import test_data +from stixcore.util.logging import get_logger +from stixcore.util.singleton import Singleton + +__all__ = ["ECCManager"] + +ECC_CONF_INDEX_FILE = Path(__file__).parent.parent / "config" / "data" / "common" / "ecc" / "ecc_conf_index.json" + +logger = get_logger(__name__) + + +class ECCManager(metaclass=Singleton): + """Manages ECC configurations and provides access to configuration data.""" + + def __init__(self, data_root=None): + """Create the manager for ECC configurations. + + Parameters + ---------- + data_root : `str` | `pathlib.Path`, optional + Path to the directory with all ECC configurations. If None, uses default path. + """ + self.config_cache = dict() + if data_root is None: + data_root = ECC_CONF_INDEX_FILE.parent + self.data_root = data_root + self._load_index() + + @property + def data_root(self): + """Get the data path root directory. + + Returns + ------- + `pathlib.Path` + path of the root directory + """ + return self._data_root + + @data_root.setter + def data_root(self, value): + """Set the data path root. + + Parameters + ---------- + data_root : `str` or `pathlib.Path` + Path to the directory with all ECC configuration versions + """ + path = Path(value) + if not path.exists(): + raise FileNotFoundError(f"Data root path does not exist: {path}") + + self._data_root = path + + def _load_index(self): + """Load the ECC configuration index file.""" + try: + with open(ECC_CONF_INDEX_FILE) as f: + self.configurations = json.load(f) + logger.info(f"Loaded {len(self.configurations)} ECC configurations from index") + except FileNotFoundError: + logger.warning(f"No ECC configuration index found at: {ECC_CONF_INDEX_FILE}") + self.configurations = [] + except json.JSONDecodeError as e: + logger.error(f"Error parsing ECC configuration index: {e}") + self.configurations = [] + + def find_configuration(self, date=None): + """Find ECC configuration valid for a given date. + + Parameters + ---------- + date : `datetime`, optional + the date for which to find the configuration, by default None (uses first available) + + Returns + ------- + `str` + configuration name/identifier + """ + if not self.configurations: + logger.warning("No ECC configurations available") + return None + + if date is None: + return self.configurations[0]["configuration"] + + # Convert date to string for comparison if it's a datetime object + if isinstance(date, datetime): + date_str = date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "+00:00" + else: + date_str = str(date) + + for config in self.configurations: + validity_period = config.get("validityPeriodUTC", []) + if len(validity_period) == 2: + start_date, end_date = validity_period + if start_date <= date_str <= end_date: + return config["configuration"] + + logger.warning(f"No ECC configuration found for date: {date}") + return None + + def get_configurations(self): + """Get all available ECC configurations. + + Returns + ------- + `list` + List of available configuration dictionaries + """ + # make a copy of the configurations to avoid modifying the original + return json.loads(json.dumps(self.configurations)) + + def has_configuration(self, configuration_name): + """Test if the ECC configuration is available. + + Parameters + ---------- + configuration_name : `str` + configuration identifier + + Returns + ------- + `bool` + does the configuration exist + """ + config_path = self._data_root / configuration_name + return config_path.exists() and config_path.is_dir() + + def get_configuration_path(self, configuration_name): + """Get the path to a specific ECC configuration. + + Parameters + ---------- + configuration_name : `str` + configuration identifier + + Returns + ------- + `pathlib.Path` + path to the configuration directory + """ + return self._data_root / configuration_name + + def create_context(self, date=None): + """Create a temporary folder with ECC configuration files for a given date. + + Parameters + ---------- + date : `datetime`, optional + the date for which to create the context, by default None + + Returns + ------- + `pathlib.Path` + path to the temporary directory containing the configuration files + `SimpleNamespace` + config read from post_ecc.ini + + Raises + ------ + ValueError + if no configuration is found for the given date + FileNotFoundError + if the configuration directory doesn't exist + """ + configuration_name = self.find_configuration(date) + if not configuration_name: + raise ValueError(f"No ECC configuration found for date: {date}") + + if not self.has_configuration(configuration_name): + raise FileNotFoundError(f"Configuration directory not found: {configuration_name}") + + # Create temporary directory + temp_dir = Path(tempfile.mkdtemp(prefix=f"ecc_context_{configuration_name}_")) + + try: + # Copy configuration files to temporary directory + config_source = self.get_configuration_path(configuration_name) + shutil.copytree(config_source, temp_dir, dirs_exist_ok=True) + + logger.info(f"Created ECC context in: {temp_dir}") + + config = ConfigParser() + config.read(temp_dir / "post_ecc.ini") + + ECC_Config = SimpleNamespace( + Max_Gain_Prime=config.getfloat("DEFAULT", "Max_Gain_Prime", fallback=1.4), + Min_Gain_Prime=config.getfloat("DEFAULT", "Min_Gain_Prime", fallback=0.4), + Min_Gain=config.getfloat("DEFAULT", "Min_Gain", fallback=0.4), + Ignore_Max_Gain_Prime_Det_Pix_List=json.loads( + config.get("DEFAULT", "Ignore_Max_Gain_Prime_Det_Pix_List", fallback="[]") + ), + Ignore_Min_Gain_Prime_Det_Pix_List=json.loads( + config.get("DEFAULT", "Ignore_Min_Gain_Prime_Det_Pix_List", fallback="[]") + ), + Ignore_Min_Gain_Det_Pix_List=json.loads( + config.get("DEFAULT", "Ignore_Min_Gain_Det_Pix_List", fallback="[]") + ), + ) + + logger.info(f"Read config from in: {temp_dir / 'post_ecc.ini'}") + return temp_dir, ECC_Config + + except Exception as e: + # Clean up on error + if temp_dir.exists(): + shutil.rmtree(temp_dir) + raise e + + def cleanup_context(self, context): + """Clean up a temporary context directory. + + Parameters + ---------- + context_path : `pathlib.Path` + path to the temporary context directory to clean up + """ + try: + context_path, _ = context + if context_path.exists(): + shutil.rmtree(context_path) + logger.info(f"Cleaned up ECC context: {context_path}") + except Exception as e: + logger.warning(f"Error cleaning up context {context_path}: {e}") + + @contextmanager + def context(self, date=None): + """Context manager for ECC configuration context. + + This provides a convenient way to use ECC configurations with automatic + cleanup using Python's 'with' statement. + + Parameters + ---------- + date : `datetime`, optional + the date for which to create the context, by default None + + Yields + ------ + `pathlib.Path` + path to the temporary directory containing the configuration files + + Raises + ------ + ValueError + if no configuration is found for the given date + FileNotFoundError + if the configuration directory doesn't exist + + """ + context = None + try: + context = self.create_context(date) + yield context + finally: + if context is not None: + self.cleanup_context(context) + + +# Create singleton instance +if "pytest" in sys.modules: + ECCManager.instance = ECCManager(test_data.ecc) +else: + ECCManager.instance = ECCManager() diff --git a/stixcore/ecc/tests/test_ecc_manager.py b/stixcore/ecc/tests/test_ecc_manager.py new file mode 100644 index 00000000..c48fb786 --- /dev/null +++ b/stixcore/ecc/tests/test_ecc_manager.py @@ -0,0 +1,379 @@ +import json +import shutil +import tempfile +from types import SimpleNamespace +from pathlib import Path +from datetime import datetime +from unittest.mock import patch, mock_open + +import pytest + +from stixcore.ecc.manager import ECCManager + + +class TestECCManager: + """Test cases for the ECCManager class.""" + + def setup_method(self): + """Set up test fixtures before each test method.""" + # Reset singleton instance for each test + ECCManager._instances = {} + + # Create temporary directory for test data + self.temp_dir = Path(tempfile.mkdtemp()) + self.ecc_dir = self.temp_dir / "ecc" + self.ecc_dir.mkdir() + + # Create test configuration directories + self.config1_dir = self.ecc_dir / "ecc_cfg_1" + self.config2_dir = self.ecc_dir / "ecc_cfg_2" + self.config1_dir.mkdir() + self.config2_dir.mkdir() + + # Create test files in configuration directories + (self.config1_dir / "config.json").write_text('{"test": "config1"}') + (self.config2_dir / "config.json").write_text('{"test": "config2"}') + (self.config1_dir / "params.txt").write_text("test parameters 1") + (self.config2_dir / "params.txt").write_text("test parameters 2") + + # Test configuration index data + self.test_index = [ + { + "configuration": "ecc_cfg_1", + "description": "Test ECC configuration 1", + "validityPeriodUTC": ["2020-01-01T00:00:00.000+00:00", "2022-01-01T00:00:00.000+00:00"], + }, + { + "configuration": "ecc_cfg_2", + "description": "Test ECC configuration 2", + "validityPeriodUTC": ["2022-01-01T00:00:00.000+00:00", "2024-01-01T00:00:00.000+00:00"], + }, + ] + + def teardown_method(self): + """Clean up after each test method.""" + # Clean up temporary directory + if self.temp_dir.exists(): + shutil.rmtree(self.temp_dir) + + def test_initialization_with_data_root(self): + """Test ECCManager initialization with custom data root.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + assert manager.data_root == self.ecc_dir + assert len(manager.configurations) == 2 + assert manager.configurations[0]["configuration"] == "ecc_cfg_1" + + def test_initialization_without_data_root(self): + """Test ECCManager initialization with default data root.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager() + + # Should use default path + expected_path = Path(__file__).parent.parent.parent / "config" / "data" / "common" / "ecc" + assert manager.data_root == expected_path + + def test_data_root_setter_valid_path(self): + """Test setting data_root with valid path.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + new_dir = self.temp_dir / "new_ecc" + new_dir.mkdir() + + manager.data_root = new_dir + assert manager.data_root == new_dir + + def test_data_root_setter_invalid_path(self): + """Test setting data_root with invalid path.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + invalid_path = self.temp_dir / "nonexistent" + + with pytest.raises(FileNotFoundError): + manager.data_root = invalid_path + + def test_load_index_success(self): + """Test successful loading of index file.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + assert len(manager.configurations) == 2 + assert manager.configurations[0]["configuration"] == "ecc_cfg_1" + assert manager.configurations[1]["configuration"] == "ecc_cfg_2" + + def test_load_index_file_not_found(self): + """Test handling of missing index file.""" + with patch("stixcore.ecc.manager.open", side_effect=FileNotFoundError): + manager = ECCManager(data_root=self.ecc_dir) + + assert manager.configurations == [] + + def test_load_index_json_decode_error(self): + """Test handling of malformed JSON in index file.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data='{"invalid": json}')): + manager = ECCManager(data_root=self.ecc_dir) + + assert manager.configurations == [] + + def test_find_configuration_no_date(self): + """Test finding configuration without specifying date.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + config = manager.find_configuration() + assert config == "ecc_cfg_1" + + def test_find_configuration_with_date(self): + """Test finding configuration with specific date.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Date within first configuration validity period + date1 = datetime(2021, 6, 15) + config1 = manager.find_configuration(date1) + assert config1 == "ecc_cfg_1" + + # Date within second configuration validity period + date2 = datetime(2023, 6, 15) + config2 = manager.find_configuration(date2) + assert config2 == "ecc_cfg_2" + + def test_find_configuration_no_match(self): + """Test finding configuration with date outside all validity periods.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Date outside all validity periods + date = datetime(2025, 6, 15) + config = manager.find_configuration(date) + assert config is None + + def test_find_configuration_empty_configurations(self): + """Test finding configuration when no configurations are loaded.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data="[]")): + manager = ECCManager(data_root=self.ecc_dir) + + config = manager.find_configuration() + assert config is None + + def test_get_configurations(self): + """Test getting all configurations.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + configs = manager.get_configurations() + assert len(configs) == 2 + assert configs[0]["configuration"] == "ecc_cfg_1" + assert configs[1]["configuration"] == "ecc_cfg_2" + + # Ensure it returns a copy + configs[0]["configuration"] = "modified" + assert manager.get_configurations()[0]["configuration"] == "ecc_cfg_1" + + def test_has_configuration_exists(self): + """Test checking if configuration exists.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + assert manager.has_configuration("ecc_cfg_1") is True + assert manager.has_configuration("ecc_cfg_2") is True + + def test_has_configuration_not_exists(self): + """Test checking if non-existent configuration exists.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + assert manager.has_configuration("ecc_cfg_nonexistent") is False + + def test_get_configuration_path(self): + """Test getting configuration path.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + path = manager.get_configuration_path("ecc_cfg_1") + assert path == self.ecc_dir / "ecc_cfg_1" + + def test_create_context_success(self): + """Test successful context creation.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + date = datetime(2021, 6, 15) + context_path, config = manager.create_context(date) + + try: + assert context_path.exists() + assert context_path.is_dir() + + # Check that configuration files were copied + config_dir = context_path + assert config_dir.exists() + assert (config_dir / "config.json").exists() + assert (config_dir / "params.txt").exists() + + # Verify file contents + config_content = (config_dir / "config.json").read_text() + assert json.loads(config_content)["test"] == "config1" + + # Verify returned config + assert isinstance(config, SimpleNamespace) + assert hasattr(config, "Max_Gain_Prime") + + finally: + # Clean up + if context_path.exists(): + shutil.rmtree(context_path) + + def test_create_context_no_configuration_found(self): + """Test context creation when no configuration is found.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Date outside all validity periods + date = datetime(2025, 6, 15) + + with pytest.raises(ValueError, match="No ECC configuration found for date"): + manager.create_context(date) + + def test_create_context_configuration_directory_not_found(self): + """Test context creation when configuration directory doesn't exist.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Remove the configuration directory + shutil.rmtree(self.config1_dir) + + date = datetime(2021, 6, 15) + + with pytest.raises(FileNotFoundError, match="Configuration directory not found"): + manager.create_context(date) + + def test_cleanup_context(self): + """Test context cleanup.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + date = datetime(2021, 6, 15) + context_path, config = manager.create_context(date) + + assert context_path.exists() + + manager.cleanup_context((context_path, config)) + + assert not context_path.exists() + + def test_cleanup_context_nonexistent(self): + """Test cleanup of non-existent context.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + nonexistent_path = self.temp_dir / "nonexistent" + + # Should not raise an exception + manager.cleanup_context((nonexistent_path, None)) + + def test_singleton_instance_attribute(self): + """Test that singleton instance is accessible via class attribute.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + # The instance should be created automatically + assert hasattr(ECCManager, "instance") + assert isinstance(ECCManager.instance, ECCManager) + + def test_context_manager_success(self): + """Test successful context manager usage.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + date = datetime(2021, 6, 15) + + with manager.context(date) as context: + context_path, config = context + # Context should be created successfully + assert context_path.exists() + assert context_path.is_dir() + + # Check that configuration files were copied + config_dir = context_path + assert config_dir.exists() + assert (config_dir / "config.json").exists() + assert (config_dir / "params.txt").exists() + + # Verify file contents + config_content = (config_dir / "config.json").read_text() + assert json.loads(config_content)["test"] == "config1" + + # Store path for later verification + temp_path = context_path + + # After exiting context, directory should be cleaned up + assert not temp_path.exists() + + def test_context_manager_no_date(self): + """Test context manager without specifying date.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + with manager.context() as context: + context_path, config = context + # Should use first configuration + assert context_path.exists() + config_dir = context_path + assert config_dir.exists() + assert (config_dir / "params.txt").exists() + + temp_path = context_path + + # Cleanup should happen automatically + assert not temp_path.exists() + + def test_context_manager_exception_during_usage(self): + """Test context manager cleanup when exception occurs during usage.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + date = datetime(2021, 6, 15) + temp_path = None + + try: + with manager.context(date) as context: + context_path, config = context + temp_path = context_path + assert context_path.exists() + # Simulate an exception during usage + raise ValueError("Test exception") + except ValueError: + # Exception should be propagated + pass + + # Cleanup should still happen despite exception + assert temp_path is not None + assert not temp_path.exists() + + def test_context_manager_no_configuration_found(self): + """Test context manager when no configuration is found.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Date outside all validity periods + date = datetime(2025, 6, 15) + + with pytest.raises(ValueError, match="No ECC configuration found for date"): + with manager.context(date): + pass + + def test_context_manager_configuration_directory_not_found(self): + """Test context manager when configuration directory doesn't exist.""" + with patch("stixcore.ecc.manager.open", mock_open(read_data=json.dumps(self.test_index))): + manager = ECCManager(data_root=self.ecc_dir) + + # Remove the configuration directory + shutil.rmtree(self.config1_dir) + + date = datetime(2021, 6, 15) + + with pytest.raises(FileNotFoundError, match="Configuration directory not found"): + with manager.context(date): + pass diff --git a/stixcore/io/ProcessingHistoryStorage.py b/stixcore/io/ProcessingHistoryStorage.py index 4d593971..8d0dba22 100644 --- a/stixcore/io/ProcessingHistoryStorage.py +++ b/stixcore/io/ProcessingHistoryStorage.py @@ -63,6 +63,22 @@ def _migrate_database(self): processed_fits_products (name, level, type, version, fits_in_path)""") if curent_DB_version < 2: + # TODO reactivate later + # self.cur.execute('''CREATE TABLE if not exists processed_flare_products ( + # id INTEGER PRIMARY KEY AUTOINCREMENT, + # flareid TEXT NOT NULL, + # flarelist TEXT NOT NULL, + # version INTEGER NOT NULL, + # name TEXT NOT NULL, + # level TEXT NOT NULL, + # type TEXT NOT NULL, + # fitspath TEXT NOT NULL, + # p_date FLOAT NOT NULL + # ) + # ''') + # self.cur.execute('''CREATE INDEX if not exists processed_flare_products_idx ON + # processed_flare_products (flareid, flarelist, version, name, + # level, type)''') # future migrations here pass self.cur.execute(f"PRAGMA user_version = {self.DB_VERSION};") diff --git a/stixcore/processing/FLtoL3.py b/stixcore/processing/FLtoL3.py new file mode 100644 index 00000000..499f4c33 --- /dev/null +++ b/stixcore/processing/FLtoL3.py @@ -0,0 +1,89 @@ +from pathlib import Path +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor + +from stixcore.config.config import CONFIG +from stixcore.ephemeris.manager import Spice +from stixcore.io.fits.processors import FitsL1Processor, FitsL3Processor +from stixcore.soop.manager import SOOPManager +from stixcore.util.logging import get_logger +from stixcore.util.util import get_complete_file_name + +logger = get_logger(__name__) + + +class FLLevel3: + """Processing step from a flare list entry to L3. + """ + def __init__(self, source_dir, output_dir, dbfile): + self.source_dir = Path(source_dir) + self.output_dir = Path(output_dir) + self.dbfile = dbfile + self.processor = FitsL3Processor(self.output_dir) + + def process_fits_files(self, files): + all_files = list() + if files is None: + files = self.level0_files + product_types = defaultdict(list) + product_types_batch = defaultdict(int) + batch_size = CONFIG.getint('Pipeline', 'parallel_batchsize_L1', fallback=150) + + for file in files: + # group by service,subservice, ssid example: 'L0/21/6/30' as default + # or (prio, service, subservice, [SSID], [BATCH]) if all data is available + batch = 0 + prio = 3 + product_type = str(file.parent) + if 'L0' in file._parts: + product_type = tuple(map(int, file._parts[file._parts.index('L0')+1:-1])) + if (product_type[0] == 21 and + product_type[-1] in {20, 21, 22, 23, 24, 42}): # sci data + product_types_batch[product_type] += 1 + prio = 2 + elif product_type[0] == 21: # ql data + prio = 1 + batch = product_types_batch[product_type] // batch_size + product_types[(prio, ) + product_type + (batch, )].append(file) + + jobs = [] + with ProcessPoolExecutor() as executor: + # simple heuristic that the daily QL data takes longest so we start early + for pt, files in sorted(product_types.items()): + jobs.append(executor.submit(process_type, files, + processor=FitsL1Processor(self.output_dir), + soopmanager=SOOPManager.instance, + spice_kernel_path=Spice.instance.meta_kernel_path, + config=CONFIG)) + + for job in jobs: + try: + new_files = job.result() + all_files.extend(new_files) + except Exception: + logger.error('error', exc_info=True) + + return list(set(all_files)) + + +def process_type(timeranges, productcls, flarelistparent, *, processor, + soopmanager, spice_kernel_path, config): + SOOPManager.instance = soopmanager + all_files = list() + Spice.instance = Spice(spice_kernel_path) + CONFIG = config + file = 1 + prod = productcls() + for tr in timeranges: + logger.info(f"processing timerange: {timeranges}") + try: + # see https://github.com/i4Ds/STIXCore/issues/350 + get_complete_file_name(file.name) + l3 = prod.from_timerange(tr, flarelistparent=flarelistparent) + all_files.extend(processor.write_fits(l3)) + except Exception as e: + logger.error('Error processing timerange %s', tr, exc_info=True) + logger.error('%s', e) + if CONFIG.getboolean('Logging', 'stop_on_error', fallback=False): + raise e + return all_files diff --git a/stixcore/processing/LBtoL0.py b/stixcore/processing/LBtoL0.py index 251ef7b5..d6983875 100644 --- a/stixcore/processing/LBtoL0.py +++ b/stixcore/processing/LBtoL0.py @@ -85,6 +85,10 @@ def process_tm_type(files, tm_type, processor, spice_kernel_path, config, idbm): RidLutManager.instance = RidLutManager(Path(CONFIG.get("Publish", "rid_lut_file")), update=False) + logger.info(f"Start Processing TM type: {tm_type} with {len(files)} files") + + RidLutManager.instance = RidLutManager(Path(CONFIG.get("Publish", "rid_lut_file")), update=False) + # Stand alone packet data if (tm_type[0] == 21 and tm_type[-2] not in {20, 21, 22, 23, 24, 42}) or tm_type[0] != 21: for file in files: diff --git a/stixcore/processing/pipeline_cron.py b/stixcore/processing/pipeline_cron.py index 1019ee13..f0a5d690 100644 --- a/stixcore/processing/pipeline_cron.py +++ b/stixcore/processing/pipeline_cron.py @@ -21,6 +21,7 @@ from stixcore.io.RidLutManager import RidLutManager from stixcore.io.soc.manager import SOCPacketFile from stixcore.processing.L0toL1 import Level1 +from stixcore.processing.L1toL2 import Level2 from stixcore.processing.LBtoL0 import Level0 from stixcore.processing.TMTCtoLB import process_tmtc_to_levelbinary from stixcore.products import Product @@ -153,9 +154,9 @@ def process_tm(path, **args): l1_files = l1_proc.process_fits_files(files=l0_files) logger.info(f"generated L1 files: \n{pformat(l1_files)}") - # l2_proc = Level2(CONFIG.get('Paths', 'tm_archive'), CONFIG.get('Paths', 'fits_archive')) - # l2_files = l2_proc.process_fits_files(files=l1_files) - # logger.info(f"generated L2 files: \n{pformat(l2_files)}") + l2_proc = Level2(CONFIG.get('Paths', 'tm_archive'), CONFIG.get('Paths', 'fits_archive')) + l2_files = l2_proc.process_fits_files(files=l1_files) + logger.info(f"generated L2 files: \n{pformat(l2_files)}") l2_files = [] error_report.log_result([list(lb_files), l0_files, l1_files, l2_files]) diff --git a/stixcore/products/__init__.py b/stixcore/products/__init__.py index 31eae23f..b356f006 100644 --- a/stixcore/products/__init__.py +++ b/stixcore/products/__init__.py @@ -8,5 +8,6 @@ from stixcore.products.level1.scienceL1 import * from stixcore.products.level2.housekeepingL2 import * from stixcore.products.level2.quicklookL2 import * +from stixcore.products.level2.scienceL2 import * from stixcore.products.levelb.binary import LevelB from stixcore.products.lowlatency.quicklookLL import * diff --git a/stixcore/products/level0/scienceL0.py b/stixcore/products/level0/scienceL0.py index d6e0349e..6877de2b 100644 --- a/stixcore/products/level0/scienceL0.py +++ b/stixcore/products/level0/scienceL0.py @@ -158,17 +158,21 @@ def split_to_files(self): for control in self.control.group_by(key_cols).groups: data = self.data[np.isin(self.data["control_index"], control["index"])] - yield type(self)( + file_chunk = type(self)( service_type=self.service_type, service_subtype=self.service_subtype, - ssid=self.ssid, - control=control, - data=data, - idb_versions=self.idb_versions, - comment=self.comment, - history=self.history, + ssid=self.ssid, control=control, data=data, + idb_versions=self.idb_versions, comment=self.comment, + history=self.history ) + if hasattr(self, 'get_additional_extensions'): + for ext, name in self.get_additional_extensions(): + # Copy all extension data tables to the new product + if ext is not None: + setattr(file_chunk, name, getattr(self, name)[:]) + yield file_chunk + @classmethod def from_levelb(cls, levelb, *, parent="", keep_parse_tree=True): """Converts level binary science packets to a L1 product. diff --git a/stixcore/products/level1/scienceL1.py b/stixcore/products/level1/scienceL1.py index 08448553..a31c8f7e 100644 --- a/stixcore/products/level1/scienceL1.py +++ b/stixcore/products/level1/scienceL1.py @@ -134,6 +134,21 @@ def bunit(self): # TODO define columns for dmin/max return " " + @property + def dmin(self): + # TODO define columns for dmin/max + return 0.0 + + @property + def dmax(self): + # TODO define columns for dmin/max + return 0.0 + + @property + def bunit(self): + # TODO define columns for dmin/max + return ' ' + @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): return kwargs["level"] == Visibility.LEVEL and service_type == 21 and service_subtype == 6 and ssid == 23 @@ -145,6 +160,7 @@ class Spectrogram(ScienceProduct, L1Mixin): In level 1 format. """ + PRODUCT_PROCESSING_VERSION = 4 NAME = "xray-spec" @@ -219,6 +235,24 @@ def dmax(self): def bunit(self): return " " + @property + def dmin(self): + return np.nanmin([self.data['cha_diode0'].min(), + self.data['cha_diode1'].min(), + self.data['chb_diode0'].min(), + self.data['chb_diode1'].min()]) + + @property + def dmax(self): + return np.nanmax([self.data['cha_diode0'].max(), + self.data['cha_diode1'].max(), + self.data['chb_diode0'].max(), + self.data['chb_diode1'].max()]) + + @property + def bunit(self): + return ' ' + @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): return kwargs["level"] == Aspect.LEVEL and service_type == 21 and service_subtype == 6 and ssid == 42 diff --git a/stixcore/products/level2/quicklookL2.py b/stixcore/products/level2/quicklookL2.py index 44e9bb92..ef095302 100644 --- a/stixcore/products/level2/quicklookL2.py +++ b/stixcore/products/level2/quicklookL2.py @@ -2,10 +2,24 @@ . """ +import re +import subprocess +from pathlib import Path from collections import defaultdict +import numpy as np + +import astropy.units as u +from astropy.io import fits +from astropy.io.fits import table_to_hdu +from astropy.table import Column, QTable + +from stixcore.calibration.ecc_post_fit import ecc_post_fit +from stixcore.calibration.elut_manager import ELUTManager +from stixcore.config.config import CONFIG +from stixcore.ecc.manager import ECCManager from stixcore.products.level0.quicklookL0 import QLProduct -from stixcore.products.product import L2Mixin +from stixcore.products.product import EnergyChannelsMixin, GenericProduct, L2Mixin from stixcore.time import SCETimeRange from stixcore.util.logging import get_logger @@ -149,8 +163,8 @@ def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): return kwargs["level"] == "L2" and service_type == 21 and service_subtype == 6 and ssid == 34 -class EnergyCalibration(QLProduct, L2Mixin): - """Quick Look energy calibration data product. +class TMStatusFlareList(QLProduct, L2Mixin): + """Quick Look TM Management status and Flare list data product. In level 2 format. """ @@ -168,21 +182,24 @@ def __init__( **kwargs, ) - self.name = "energy" + self.name = "ql-tmstatusflarelist" self.level = "L2" - self.type = "cal" @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): - return kwargs["level"] == "L2" and service_type == 21 and service_subtype == 6 and ssid == 41 + return kwargs["level"] == "L2" and service_type == 21 and service_subtype == 6 and ssid == 43 -class TMStatusFlareList(QLProduct, L2Mixin): - """Quick Look TM Management status and Flare list data product. +class EnergyCalibration(GenericProduct, EnergyChannelsMixin, L2Mixin): + """Quick Look energy calibration data product. In level 2 format. """ + NAME = "energy" + LEVEL = "L2" + TYPE = "cal" + def __init__( self, *, service_type, service_subtype, ssid, control, data, idb_versions=defaultdict(SCETimeRange), **kwargs ): @@ -196,9 +213,314 @@ def __init__( **kwargs, ) - self.name = "ql-tmstatusflarelist" - self.level = "L2" + self.name = EnergyCalibration.NAME + self.level = EnergyCalibration.LEVEL + self.type = EnergyCalibration.TYPE + + @property + def fits_daily_file(self): + return True + + @property + def dmin(self): + # default for FITS HEADER + return 4 + + @property + def dmax(self): + # default for FITS HEADER + return 150 + + @property + def bunit(self): + # default for FITS HEADER + return "kEV" + + @property + def exposure(self): + # default for FITS HEADER + return self.control["integration_time"].min().to_value(u.s) + + @property + def max_exposure(self): + # default for FITS HEADER + return self.control["integration_time"].max().to_value(u.s) @classmethod def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): - return kwargs["level"] == "L2" and service_type == 21 and service_subtype == 6 and ssid == 43 + return kwargs["level"] == EnergyCalibration.LEVEL and service_type == 21 and service_subtype == 6 and ssid == 41 + + def get_additional_extensions(self): + return [] + + @classmethod + def from_level1(cls, l1product, parent="", idlprocessor=None): + l2 = super().from_level1(l1product, parent=parent)[0] + + date = l2.utc_timerange.start.datetime + ob_elut, sci_channels = ELUTManager.instance.get_elut(date) + + e_actual_list = [] + off_gain_list = [] + + ecc_only_e_actual_list = [] + ecc_only_off_gain_list = [] + + ecc_err_list = [] + gain_range_ok_list = [] + + l2.control.add_column( + Column( + name="ob_elut_name", + data=np.repeat("_" * 50, len(l2.control)), + description="Name of the ELUT active on instrument", + ) + ) + + for spec_idx, spec in enumerate(l2.data["counts"]): + if spec.shape != (32, 12, 1024): + raise ValueError(f"Unexpected shape {spec.shape} for counts in {l1product.name}") + + all_spec = spec.sum(axis=1).sum(axis=0) + all_spec_table_total_rate = QTable( + [[np.int16(len(all_spec))], np.int32([all_spec])], names=["NUM_POINTS", "COUNTS"] + ) + all_spec_rate = spec.reshape(12 * 32, 1024) + all_spec_table_rate = QTable( + [ + np.repeat(np.arange(32), 12).astype(np.uint8), + np.tile(np.arange(12), 32).astype(np.uint8), + np.zeros(12 * 32, dtype=int).astype(np.uint8), + np.full(12 * 32, 1024, dtype=int).astype(np.int16), + all_spec_rate.astype(np.int32), + ], + names=["DETECTOR_ID", "PIXEL_ID", "SUBSPEC_ID", "NUM_POINTS", "COUNTS"], + ) + + spec_ecc = spec.reshape(12 * 32, 1024).T.astype(np.float32) + + spec_ecc_table = QTable(spec_ecc, names=[f"ch{i:03d}" for i in range(12 * 32)]) + spec_ecc_table.add_column(np.arange(1024, dtype=np.float32) * u.Unit("adu"), index=0, name="PHA_A") + spec_ecc_table.add_column( + (np.arange(1024, dtype=np.float32) + 0.5) * u.Unit("adu"), index=1, name="PHA_center" + ) + spec_ecc_table.add_column((np.arange(1024, dtype=np.float32) + 1) * u.Unit("adu"), index=2, name="PHA_B") + + spec_filename = l1product.fits_header["FILENAME"] + + if not re.match(r"^solo_L1_stix-cal-energy_\d+_V.*.fits$", spec_filename): + raise ValueError(f"Invalid filename {spec_filename} for energy calibration data") + + spec_filename = spec_filename.replace(".fits", "_ecc_in.fits") + ecc_install_path = Path(CONFIG.get("ECC", "ecc_path")) + + with ECCManager.instance.context(date) as ecc_run_context: + ecc_run_context_path, ecc_run_cfg = ecc_run_context + + spec_file = ecc_run_context_path / spec_filename + all_file = ecc_run_context_path / "spec_all.fits" + spec_all_erg = ecc_run_context_path / "spec_all_erg.fits" + erg_path = ecc_run_context_path / "ECC_para.fits" + bash_script = f"""#!/bin/bash + cd {ecc_run_context_path} + + {ecc_install_path}/Bkg + {ecc_install_path}/ECC --f_obs "{spec_filename}" + {ecc_install_path}/STX_Calib spec_all.fits ECC_para.fits[1] + """ + primary_hdu = fits.PrimaryHDU() + + all_spec_enc = fits.connect._encode_mixins(all_spec_table_rate) + all_spec = table_to_hdu(all_spec_enc) + all_spec.name = "RATE" + + all_spec_total_enc = fits.connect._encode_mixins(all_spec_table_total_rate) + all_spec_total = table_to_hdu(all_spec_total_enc) + all_spec_total.name = "TOTAL_RATE" + + hdul = [primary_hdu, all_spec, all_spec_total] + hdul = fits.HDUList(hdul) + hdul.writeto(all_file, overwrite=True, checksum=True) + + primary_hdu = fits.PrimaryHDU() + spec_enc = fits.connect._encode_mixins(spec_ecc_table) + spec_hdu = table_to_hdu(spec_enc) + spec_hdu.name = "SPEC_ECC" + hdul = [primary_hdu, spec_hdu] + hdul = fits.HDUList(hdul) + hdul.writeto(spec_file, overwrite=True, checksum=True) + + if not spec_file.exists(): + raise FileNotFoundError(f"Failed to write energy calibration data in ECC format to {spec_file}") + logger.info(f"Energy calibration data in ECC format written to {spec_file}") + + # Run bash script directly + process = subprocess.run(["bash"], input=bash_script, text=True, capture_output=True) + if process.returncode != 0: + raise RuntimeError(f"ECC Bash script failed: {process.stderr}") + + logger.info("ECC bash script executed successfully: \n%s", process.stdout) + + if not erg_path.exists(): + raise FileNotFoundError(f"Failed to read ECC result file {erg_path}") + + control_idx = l2.data["control_index"][spec_idx] + livetime = l1product.control["live_time"][control_idx].to_value(u.s) + ecc_pf_df, idx_ecc = ecc_post_fit(spec_all_erg, erg_path, livetime) + logger.info( + "Run ecc post fit: replaced [%s %%] gain offset pairs with 'better fits'", + round((len(idx_ecc) - idx_ecc.sum()) / max(1, len(idx_ecc)) * 100, ndigits=1), + ) + + off_gain = np.array( + [ + 4.0 * ecc_pf_df["Offset_Cor"].values.reshape(32, 12), + 1.0 / (4.0 * ecc_pf_df["Gain_Cor"].values.reshape(32, 12)), + ecc_pf_df["goc"].values.reshape(32, 12), + ] + ) + off_gain_list.append(off_gain) + + off_gain_ecc = np.array( + [ + 4.0 * ecc_pf_df["Offset_ECC"].values.reshape(32, 12), + 1.0 / (4.0 * ecc_pf_df["Gain_ECC"].values.reshape(32, 12)), + ecc_pf_df["goc"].values.reshape(32, 12), + ] + ) + ecc_only_off_gain_list.append(off_gain_ecc) + + l2.control["ob_elut_name"][l2.data["control_index"][spec_idx]] = ob_elut.file + + ecc_err_list.append( + np.array( + [ + ecc_pf_df["err_P31"].values.reshape(32, 12), + ecc_pf_df["err_dE31"].values.reshape(32, 12), + ecc_pf_df["err_P81"].values.reshape(32, 12), + ecc_pf_df["err_dE81"].values.reshape(32, 12), + ] + ) + ) + + gain_range_ok = True + for h in ecc_pf_df.index[ecc_pf_df["Gain_Prime"] > ecc_run_cfg.Max_Gain_Prime]: + det_pix_can = [ecc_pf_df["DET"][h], ecc_pf_df["PIX"][h]] + if det_pix_can not in ecc_run_cfg.Ignore_Max_Gain_Prime_Det_Pix_List: + logger.warning( + f"ECC result Gain_Prime {ecc_pf_df['Gain_Prime'][h]} " + f"for DET {det_pix_can[0]} PIX {det_pix_can[1]} exceeds " + f"Max_Gain {ecc_run_cfg.Max_Gain_Prime}, " + "but not in ignore list" + ) + gain_range_ok = False + + for h in ecc_pf_df.index[ecc_pf_df["Gain_Prime"] < ecc_run_cfg.Min_Gain_Prime]: + det_pix_can = [ecc_pf_df["DET"][h], ecc_pf_df["PIX"][h]] + if det_pix_can not in ecc_run_cfg.Ignore_Min_Gain_Prime_Det_Pix_List: + logger.warning( + f"ECC result Gain_Prime {ecc_pf_df['Gain_Prime'][h]} " + f"for DET {det_pix_can[0]} PIX {det_pix_can[1]} falls below " + f"Min_Gain_Prime {ecc_run_cfg.Min_Gain_Prime}, " + "but not in ignore list" + ) + gain_range_ok = False + + for h in ecc_pf_df.index[ecc_pf_df["Gain_Cor"] < ecc_run_cfg.Min_Gain]: + det_pix_can = [ecc_pf_df["DET"][h], ecc_pf_df["PIX"][h]] + if det_pix_can not in ecc_run_cfg.Ignore_Min_Gain_Det_Pix_List: + logger.warning( + f"ECC result Gain_Cor {ecc_pf_df['Gain_Cor'][h]} " + f"for DET {det_pix_can[0]} PIX {det_pix_can[1]} falls below " + f"Min_Gain {ecc_run_cfg.Min_Gain}, " + "but not in ignore list" + ) + gain_range_ok = False + + gain_range_ok_list.append(gain_range_ok) + + gain = off_gain[1, :, :] + offset = off_gain[0, :, :] + + # calculate the actual energy edges taking the applied ELUT into + # account for calibration of data recorded with the ELUT + e_actual = (ob_elut.adc - offset[..., None]) * gain[..., None] + + e_actual_ext = np.pad( + e_actual, + # pad last axis by 1 on both sides + pad_width=((0, 0), (0, 0), (1, 1)), + mode="constant", + # first pad with 0, last pad with inf + constant_values=(0, np.inf), + ) + e_actual_list.append(e_actual_ext) + + gain_ecc = off_gain_ecc[1, :, :] + offset_ecc = off_gain_ecc[0, :, :] + + # calculate the actual energy edges taking the applied ELUT into + # account for calibration of data recorded with the ELUT + e_actual_ecc = (ob_elut.adc - offset_ecc[..., None]) * gain_ecc[..., None] + + e_actual_ext_ecc = np.pad( + e_actual_ecc, + # pad last axis by 1 on both sides + pad_width=((0, 0), (0, 0), (1, 1)), + mode="constant", + # first pad with 0, last pad with inf + constant_values=(0, np.inf), + ) + ecc_only_e_actual_list.append(e_actual_ext_ecc) + + # end of ECC context block + # end of for each spectrum + l2.data.add_column( + Column( + name="e_edges_actual", + data=e_actual_list, + description="actual energy edges fitted by ECC and post fitting", + ) + ) # noqa + l2.data["e_edges_actual"].unit = u.keV + + l2.data.add_column( + Column( + name="offset_gain_goc", + data=off_gain_list, + description="result of the ecc fitting: offset, gain, goc and post fitting", + ) + ) # noqa + + l2.data.add_column( + Column( + name="ecc_only_e_edges_actual", + data=ecc_only_e_actual_list, + description="actual energy edges fitted by ECC only", + ) + ) # noqa + l2.data["ecc_only_e_edges_actual"].unit = u.keV + + l2.data.add_column( + Column( + name="ecc_only_offset_gain_goc", + data=ecc_only_off_gain_list, + description="result of the ecc fitting only: offset, gain, goc", + ) + ) # noqa + + l2.data.add_column( + Column( + name="ecc_error", + data=ecc_err_list, + description="error estimate from ECC: err_P31, err_dE31, err_P81, err_dE81", + ) + ) # noqa + l2.data.add_column( + Column(name="gain_range_ok", data=gain_range_ok_list, description="is gain in expected range") + ) + + del l2.data["counts_comp_err"] + del l2.data["counts"] + + return [l2] diff --git a/stixcore/products/level2/scienceL2.py b/stixcore/products/level2/scienceL2.py new file mode 100644 index 00000000..5c89452b --- /dev/null +++ b/stixcore/products/level2/scienceL2.py @@ -0,0 +1,43 @@ +from collections import defaultdict + +from stixcore.products.level0.scienceL0 import ScienceProduct +from stixcore.products.product import L2Mixin +from stixcore.time import SCETimeRange + +__all__ = ['Spectrogram'] + + +class Spectrogram(ScienceProduct, L2Mixin): + """ + X-ray Spectrogram data product. + + In level 2 format. + """ + PRODUCT_PROCESSING_VERSION = 2 + + def __init__(self, *, service_type, service_subtype, ssid, control, + data, idb_versions=defaultdict(SCETimeRange), **kwargs): + super().__init__(service_type=service_type, service_subtype=service_subtype, + ssid=ssid, control=control, data=data, idb_versions=idb_versions, **kwargs) + self.name = 'xray-spec' + self.level = 'L2' + + @classmethod + def is_datasource_for(cls, *, service_type, service_subtype, ssid, **kwargs): + return (kwargs['level'] == 'L2' and service_type == 21 + and service_subtype == 6 and ssid == 24) + + def get_additional_extensions(self): + return [(self.data_corrected if (hasattr(self, "data_corrected") and + len(self.data_corrected) > 0) else None, + "data_corrected")] + + @classmethod + def from_level1(cls, l1product, parent='', idlprocessor=None): + l2 = super().from_level1(l1product, parent=parent)[0] + l2.data['test'] = 1 + + # interpolate counts if RCR > 0 at any time point + if l2.data['rcr'].sum() > 0: + l2.data_corrected = l2.data[:] + return [l2] diff --git a/stixcore/products/product.py b/stixcore/products/product.py index b810f910..7ad02baa 100644 --- a/stixcore/products/product.py +++ b/stixcore/products/product.py @@ -315,6 +315,17 @@ def __call__(self, *args, **kwargs): month=month, ) + if hasattr(p, "get_additional_extensions") and data is not None: + for _, name in p.get_additional_extensions(): + # read the additional extension data + if name.upper() in hdul: + data_ext = read_qtable(file_path, hdu=name.upper(), hdul=hdul) + if "timedel" in data_ext.colnames: + data_ext["timedel"] = SCETimeDelta(data_ext["timedel"]) + if "time" in data_ext.colnames: + data_ext["time"] = offset + data_ext["time"] + setattr(p, name, data_ext) + # store the old fits header for later reuse if isinstance(p, (L1Mixin, L2Mixin)): p.fits_header = pri_header @@ -963,6 +974,18 @@ class L2Mixin(FitsHeaderMixin): def utc_timerange(self): return self.scet_timerange.to_timerange() + @classmethod + def get_additional_extensions(cls): + """ + Get the additional extensions that should be added to the L2 fits file product. + + Returns + ------- + list + List of additional extensions to add to the L2 product. + """ + return [] + @classmethod def from_level1(cls, l1product, parent="", idlprocessor=None): l2 = cls( diff --git a/stixcore/util/scripts/ddpd.py b/stixcore/util/scripts/ddpd.py index 67403123..cbee88b4 100644 --- a/stixcore/util/scripts/ddpd.py +++ b/stixcore/util/scripts/ddpd.py @@ -9,6 +9,7 @@ run the script (best on pub099). It will generate output in stixcore/util/scripts/ddpd.html + replace V02U.fit with V02.fits Descriptions are derived from different sources doc strings, idb, fits files