import pandas as pd
import xarray as xr
from pathlib import Path
from attrs import define, field
import datetime
import dbdreader
import shutil
import gsw
import random
import os
import logging
from .utils import find_nth, setup_logging
from .variable import Variable
from .gridder import Gridder
from .dataset_attrs import get_default_variables, get_global_attrs
[docs]
@define
class Processor:
"""
A class to process glider data
"""
# Required attributes
memory_card_copy_path: Path
working_dir: Path
mission_num: str
# Default attributes
mission_vars: list[Variable] = field(factory=list)
glider_ids: dict = field(default={'199': 'Dora', '307': 'Reveille', '308': 'Howdy', '540': 'Stommel', '541': 'Sverdrup', '1148': 'unit_1148'})
wmo_ids: dict = field(default={'199': 'unknown', '307': '4801938', '308': '4801915', '540': '4801916', '541': '4801924', '1148': '4801915'})
# Optional attributes
mission_start_date: datetime.datetime = field(default=pd.to_datetime('2010-01-01')) # Used to slice the data during processing
mission_end_date: datetime.datetime = field(default=pd.to_datetime(datetime.datetime.today()+datetime.timedelta(days=365))) # Used to slice the data during processing
recopy_files: bool = field(default=False) # If True, always recopy files even if they already exist
include_gridded_data: bool = field(default=True) # If True, include gridded data in the output dataset
_log_level: str = field(default='INFO') # Logging level for the application
# Created attributes
dbd: dbdreader.MultiDBD|None = field(default=None)
_df: pd.DataFrame|None = field(default=None)
ds: xr.Dataset|None = field(default=None)
# Private backing fields
_glider_id: str|None = field(default=None)
_glider_name: str|None = field(default=None)
_wmo_id: str|None = field(default=None)
_mission_year: str|None = field(default=None)
_mission_title: str|None = field(default=None)
_mission_folder_name: str|None = field(default=None)
_mission_folder_path: Path|None = field(default=None)
_netcdf_filename: str|None = field(default=None)
_netcdf_output_path: Path|None = field(default=None)
_dbd_variables: list|None = field(default=None)
_sci_dbd_variables: list|None = field(default=None)
_eng_dbd_variables: list|None = field(default=None)
_sci_df: pd.DataFrame|None = field(default=None)
_eng_df: pd.DataFrame|None = field(default=None)
_sci_ds: xr.Dataset|None = field(default=None)
_eng_ds: xr.Dataset|None = field(default=None)
@property
def dbd_variables(self) -> list:
return self.sci_dbd_vars + self.eng_dbd_vars
@property
def sci_dbd_vars(self) -> list:
"""Get the science DBD variables."""
if self.dbd is None:
self.dbd = self._read_dbd()
return self.dbd.parameterNames['sci']
@property
def eng_dbd_vars(self) -> list:
"""Get the engineering DBD variables."""
if self.dbd is None:
self.dbd = self._read_dbd()
return self.dbd.parameterNames['eng']
@property
def eng_vars(self) -> list:
"""Get engineering variables (non-calculated vars starting with ``m_``)"""
return [var.short_name for var in self.mission_vars
if (not var.calculated) and (var.data_source_name.startswith('m_'))] #type: ignore
@property
def sci_vars(self) -> list:
"""Get science variables (all non-engineering variables)"""
return self.df.columns.drop(self.eng_vars).tolist()
@property
def glider_id(self) -> str|None:
"""Get the glider ID."""
if self._glider_id is None:
self._glider_id = self._get_glider_id()
return self._glider_id
@property
def glider_name(self) -> str|None:
"""Get the glider name."""
if self._glider_name is None:
self._glider_name = self.glider_ids[self.glider_id]
return self._glider_name
@property
def wmo_id(self) -> str|None:
"""Get the WMO ID."""
if self._wmo_id is None:
self._wmo_id = self.wmo_ids[self.glider_id]
return self._wmo_id
@property
def mission_year(self) -> str:
"""Get the mission year."""
if self._mission_year is None:
self._mission_year = self._get_mission_year()
return self._mission_year
@property
def mission_title(self) -> str:
"""Get the mission title."""
if self._mission_title is None:
self._mission_title = f'Mission {self.mission_num}'
return self._mission_title
@property
def mission_folder_name(self) -> str:
"""Get the mission folder name."""
if self._mission_folder_name is None:
self._mission_folder_name = self.mission_title.replace(' ', '_')
return self._mission_folder_name
@property
def mission_folder_path(self) -> Path:
"""Get the mission folder path."""
if self._mission_folder_path is None:
self._mission_folder_path = self.working_dir.joinpath(self.mission_folder_name)
return self._mission_folder_path
@property
def netcdf_filename(self) -> str:
"""Get the NetCDF filename."""
if self._netcdf_filename is None:
self._netcdf_filename = f'M{self.mission_num}_{self.mission_year}_{self.glider_id}.nc'
return self._netcdf_filename
@property
def netcdf_output_path(self) -> Path:
"""Get the NetCDF path."""
if self._netcdf_output_path is None:
self._netcdf_output_path = self.mission_folder_path.joinpath(f'{self.netcdf_filename}')
return self._netcdf_output_path
@property
def df(self) -> pd.DataFrame:
if self._df is None:
self._df = self._convert_dbd_to_dataframe()
return self._df
@property
def sci_df(self) -> pd.DataFrame:
return self.df[self.sci_vars]
@property
def eng_df(self) -> pd.DataFrame:
eng_df = self.df[self.eng_vars].copy()
eng_df.index.name = 'm_time'
return eng_df
@property
def sci_ds(self) -> xr.Dataset:
return xr.Dataset.from_dataframe(self.sci_df)
@property
def eng_ds(self) -> xr.Dataset:
return xr.Dataset.from_dataframe(self.eng_df)
@property
def log_level(self) -> str:
"""Get the current logging level."""
return self._log_level
@log_level.setter
def log_level(self, level: str):
"""Set the logging level and update the logger configuration."""
self._log_level = level.upper()
setup_logging(level=self._log_level)
@property
def logger(self) -> logging.Logger:
"""Get the logger instance for this processor."""
return logging.getLogger('glider_ingest')
def __attrs_post_init__(self):
"""
Post init method to add default variables to the mission_vars list
"""
setup_logging(level=self._log_level)
self.logger.info("Initializing Processor for mission %s", self.mission_num)
self.logger.debug("Memory card path: %s", self.memory_card_copy_path)
self.logger.debug("Working directory: %s", self.working_dir)
self.add_mission_vars(get_default_variables())
self.logger.debug("Added %d default variables", len(get_default_variables()))
def _check_mission_var_duplicates(self):
"""
Check for duplicate variables in the mission_vars list
"""
# Get the variable data_source_names
var_names = self._get_mission_variable_short_names()
if len(set(var_names)) != len(var_names):
print('Duplicate variables in mission_vars list')
[docs]
def add_mission_vars(self, mission_vars: list[Variable]|list[str]|Variable|str):
"""
Add variables to the mission_vars list.
Args:
mission_vars: Can be any of:
- Single Variable object
- Single string
- List of Variable objects
- List of strings
- Mixed list of Variables and strings
"""
# Convert to list if single item
if isinstance(mission_vars, Variable):
mission_vars = [mission_vars]
elif isinstance(mission_vars, str):
mission_vars = [mission_vars]
self.logger.debug("Adding %d variables to mission_vars", len(mission_vars))
# Process each variable
processed_vars = []
for var in mission_vars:
if isinstance(var, str):
processed_vars.append(Variable(data_source_name=var))
self.logger.debug("Added string variable: %s", var)
elif isinstance(var, Variable):
processed_vars.append(var)
self.logger.debug("Added Variable object: %s", var.data_source_name)
self.mission_vars.extend(processed_vars)
self.logger.info("Total mission variables: %d", len(self.mission_vars))
self._check_mission_var_duplicates()
[docs]
def remove_mission_vars(self, vars_to_remove: list[str]|str):
"""
Remove variables from mission_vars list by data source name.
Args:
vars_to_remove: Can be a single string or list of strings representing
data_source_names to remove
"""
# Convert single string to list
if isinstance(vars_to_remove, str):
vars_to_remove = [vars_to_remove]
self.logger.debug("Removing %d variables: %s", len(vars_to_remove), vars_to_remove)
initial_count = len(self.mission_vars)
# Filter out the variables to remove
self.mission_vars = [var for var in self.mission_vars
if var.data_source_name not in vars_to_remove]
removed_count = initial_count - len(self.mission_vars)
self.logger.info("Successfully removed %d variables. Remaining: %d", removed_count, len(self.mission_vars))
def _copy_files(self):
"""
Copy only LOGS and STATE/CACHE folders from memory card copy to working directory
"""
self.logger.info("Starting file copy operation")
original_loc = self.memory_card_copy_path
new_loc = self.mission_folder_path
self.logger.debug("Source: %s", original_loc)
self.logger.debug("Destination: %s", new_loc)
# Define patterns to include
include_patterns = ['**/LOGS', '**/logs', '**/STATE/CACHE', '**/state/cache']
copied_count = 0
skipped_count = 0
for pattern in include_patterns:
self.logger.debug("Processing pattern: %s", pattern)
for source_path in original_loc.glob(pattern):
# Create relative path to maintain directory structure
relative_path = source_path.relative_to(original_loc)
destination_path = new_loc / relative_path
# Skip copying if files already exist and recopy_files is False
if destination_path.exists() and not self.recopy_files:
self.logger.debug("Skipping existing directory: %s", destination_path)
skipped_count += 1
continue
# Create parent directories if they don't exist
destination_path.parent.mkdir(parents=True, exist_ok=True)
# Copy directory
if source_path.is_dir():
self.logger.info('Copying %s to %s', source_path, destination_path)
shutil.copytree(source_path, destination_path, dirs_exist_ok=True)
copied_count += 1
self.logger.info("File copy complete. Copied: %d, Skipped: %d", copied_count, skipped_count)
def _get_files_by_extension(self,directory_path: Path, extensions: list[str], as_string: bool = False) -> list:
"""
Get files from a directory with specified extensions.
Args:
directory_path (Path): Directory to search for files
extensions (list[str]): List of file extensions to match (e.g. ['.dbd', '.DBD'])
as_string (bool): Whether to return paths as strings
Returns:
list: List of matching files as Path objects or strings
"""
files = [p for p in directory_path.rglob('*') if p.suffix in extensions]
if as_string:
files = [str(p) for p in files]
return files
def _get_cache_files(self,as_string:bool=False):
"""
Get the cache files from the memory card copy
"""
extensions = ['.cac','.CAC']
directory_path = self.memory_card_copy_path
cac_files = self._get_files_by_extension(directory_path=directory_path,extensions=extensions,as_string=as_string)
return cac_files
def _get_cache_files_path(self):
"""
Get the cache file path from the memory card copy
"""
return self.mission_folder_path.joinpath('cache')
def _copy_cache_files(self):
"""
Move the cache files to the working directory from both flight and science cards
"""
self.logger.info("Starting cache file copy operation")
# Define cache source locations for both flight and science cards
cache_sources = [
self.memory_card_copy_path / 'Flight_card' / 'STATE' / 'CACHE',
self.memory_card_copy_path / 'Science_card' / 'STATE' / 'CACHE'
]
cache_dest = self.mission_folder_path / 'CACHE'
# Create destination directory if it doesn't exist
if not cache_dest.exists():
cache_dest.mkdir(parents=True, exist_ok=True)
self.logger.debug("Created cache destination directory: %s", cache_dest)
copied_count = 0
skipped_count = 0
for cache_source in cache_sources:
if not cache_source.exists():
self.logger.debug("Cache source does not exist: %s", cache_source)
continue
self.logger.debug("Copying cache files from: %s", cache_source)
for cache_file in cache_source.iterdir():
if cache_file.is_file():
dest_file = cache_dest / cache_file.name
# check if cache file already exists
if dest_file.exists() and not self.recopy_files:
self.logger.debug("Skipping existing cache file: %s", cache_file.name)
skipped_count += 1
continue
# copy cache file
self.logger.debug('Copying cache file %s to %s', cache_file.name, cache_dest)
shutil.copy2(cache_file, dest_file)
copied_count += 1
self.logger.debug("Cache file copy complete. Copied: %d, Skipped: %d", copied_count, skipped_count)
def _get_dbd_files(self,as_string=False):
"""
Get the dbd files from the memory card copy
"""
directory_path = self.mission_folder_path
extensions = ['.dbd','.DBD','.ebd','.EBD']
dbd_files = self._get_files_by_extension(directory_path=directory_path,extensions=extensions,as_string=as_string)
return dbd_files
def _read_dbd(self) -> dbdreader.MultiDBD:
"""
Read the files from the memory card copy
"""
self.logger.info("Reading DBD files")
self._copy_files()
self._copy_cache_files()
filenames = self._get_dbd_files(as_string=True)
self.logger.debug("Found %d DBD files", len(filenames))
self.logger.debug("DBD files: %s%s", [Path(f).name for f in filenames[:5]], '...' if len(filenames) > 5 else '')
cacheDir = self._get_cache_files_path()
self.logger.debug("Cache directory: %s", cacheDir)
dbd = dbdreader.MultiDBD(filenames=filenames,cacheDir=cacheDir)
self.logger.info("Successfully initialized MultiDBD reader")
return dbd
def _get_mission_variables(self,filter_out_none=False):
"""
Get the mission variables from the mission_vars list. Filter out None data_source_name values if desired.
"""
if filter_out_none:
return [var for var in self.mission_vars if var.data_source_name is not None]
else:
return self.mission_vars
def _get_mission_variable_short_names(self,filter_out_none=False):
"""
Get the mission variable data source names from the mission_vars list
"""
return [var.short_name for var in self._get_mission_variables(filter_out_none=filter_out_none)]
def _get_mission_variable_data_source_names(self,filter_out_none=False):
"""
Get the mission variable data source names from the mission_vars list
"""
return [var.data_source_name for var in self._get_mission_variables(filter_out_none=filter_out_none)]
def _check_default_variables(self,variables_to_get:list):
"""
Check that the default variables are in the dbd variables and remove missing ones from both the list and mission_vars
"""
self.logger.debug("Checking variable availability in DBD files")
dbd_vars = self.dbd_variables
self.logger.info(f"DBD contains {len(dbd_vars)} total variables")
missing_vars = [var for var in variables_to_get if var not in dbd_vars]
if missing_vars:
self.logger.warning(f'Missing variables in DBD files: {missing_vars}')
self.logger.info('Removing missing variables from processing list')
variables_to_get = [var for var in variables_to_get if var not in missing_vars]
# Also remove missing variables from mission_vars to maintain consistency
self.mission_vars = [var for var in self.mission_vars
if var.data_source_name not in missing_vars]
else:
self.logger.info("All requested variables found in DBD files")
self.logger.info("Final variable count for processing: %d", len(variables_to_get))
return variables_to_get
def _get_sci_files(self):
"""
Get the sci files from the memory card copy
"""
directory_path = self.memory_card_copy_path
extensions = ['.dbd','.DBD']
sci_files = self._get_files_by_extension(directory_path=directory_path,extensions=extensions,as_string=True)
return sci_files
def _get_random_sci_file(self):
"""
Get a random sci file from the mission folder
"""
sci_files = self._get_sci_files()
random_file = random.choice(sci_files)
# Pick a new file if the file is empty
while os.stat(random_file).st_size == 0:
random_file = random.choice(sci_files)
return random_file
def _get_full_filename(self):
"""
Get the full filename from the file
Args:
file (str): Path to the file to read.
Returns:
str: The extracted full filename, or None if not found.
"""
file = self._get_random_sci_file()
with open(file, errors="ignore") as fp:
for line in fp:
if 'full_filename' in line.strip():
return line.replace('full_filename:', '').strip()
return None
def _get_mission_year(self):
"""
Get the mission year from the filename.
Extracts and validates the mission year from the filename, converting between
mission names and IDs as needed using the mission_ids mapping.
Returns
-------
str
The validated mission year
"""
full_filename = self._get_full_filename()
if full_filename is None:
self.logger.error("Could not extract full filename from DBD files")
return "unknown"
mission_year = full_filename[full_filename.find('-') + 1: find_nth(full_filename, '-', 2)].strip()
return mission_year
def _get_glider_id(self) -> str|None:
"""
Get the glider id from the filename.
Extracts and validates the glider identifier from the filename, converting between
glider names and IDs as needed using the glider_ids mapping.
Returns
-------
str
The validated glider ID
"""
full_filename = self._get_full_filename()
if full_filename is None:
self.logger.error("Could not extract full filename from DBD files")
return None
glider_identifier = full_filename.split('-')[0].replace('unit_', '').strip()
# Create reverse mapping from names to IDs
inverted_glider_ids = {v: k for k, v in self.glider_ids.items()}
# Check if identifier is a valid ID
if glider_identifier in self.glider_ids:
return glider_identifier
# Check if identifier is a valid name
if glider_identifier in inverted_glider_ids:
return inverted_glider_ids[glider_identifier]
valid_options = list(self.glider_ids.keys()) + list(self.glider_ids.values())
print(f'Invalid glider identifier: {glider_identifier}. Must be one of: {valid_options}')
return None
def _get_dbd_data(self):
self.logger.info("Extracting data from DBD files")
self.dbd = self._read_dbd()
variables_to_get = self._get_mission_variable_data_source_names(filter_out_none=True)
self.logger.info("Requesting %d variables", len(variables_to_get))
variables_to_get = self._check_default_variables(variables_to_get)
self.logger.info("Synchronizing data extraction...")
data = self.dbd.get_sync(*variables_to_get)
self.logger.info("Successfully extracted data with variables of: %s", variables_to_get)
self.dbd.close()
return data, variables_to_get
def _format_time(self,df:pd.DataFrame):
self.logger.debug("Formatting time data")
initial_rows = len(df)
# Convert time to datetime format and filter valid dates
df['time'] = pd.to_datetime(df['time'],unit='s', errors='coerce')
df = df.dropna(how='all')
after_dropna = len(df)
valid_dates_mask = (df['time'] >= self.mission_start_date) & \
(df['time'] <= self.mission_end_date)
df = df.loc[valid_dates_mask]
final_rows = len(df)
self.logger.info("Time filtering: %d -> %d -> %d rows", initial_rows, after_dropna, final_rows)
if final_rows > 0:
self.logger.info("Time range: %s to %s", df['time'].min(), df['time'].max())
else:
self.logger.warning("No data remaining after time filtering")
return df
def _calculate_vars(self,df):
self.logger.info("Performing variable calculations and conversions")
# Perform variable conversions and calculations
if 'm_pressure' in df.columns:
df['m_pressure'] *= 10 # Convert pressure from db to dbar
self.logger.debug("Converted m_pressure from db to dbar")
if 'sci_water_pressure' in df.columns:
df['sci_water_pressure'] *= 10 # Convert pressure from db to dbar
self.logger.debug("Converted sci_water_pressure from db to dbar")
if 'sci_water_cond' in df.columns:
df['sci_water_cond'] *= 1000 # Convert conductivity from mS/cm to S/m
self.logger.debug("Converted sci_water_cond from mS/cm to S/m")
vars_for_salinity_and_density = {'sci_water_cond', 'sci_water_temp', 'sci_water_pressure'}
if vars_for_salinity_and_density.issubset(df.columns):
self.logger.info("Calculating salinity and density from CTD data")
df['salinity'] = gsw.SP_from_C(df['sci_water_cond'] * 10, df['sci_water_temp'], df['sci_water_pressure'])
CT = gsw.CT_from_t(df['salinity'], df['sci_water_temp'], df['sci_water_pressure'])
df['density'] = gsw.rho_t_exact(df['salinity'], CT, df['sci_water_pressure'])
self.logger.debug("Calculated salinity range: %.2f - %.2f", df['salinity'].min(), df['salinity'].max())
self.logger.debug("Calculated density range: %.2f - %.2f", df['density'].min(), df['density'].max())
else:
self.logger.warning("Cannot calculate salinity/density - missing required CTD variables")
return df
def _update_dataframe_columns(self,df):
"""
Update the dataframe columns with the mission variables.
Adjusting the current column names, which are data source names, to their short_name values.
"""
column_map = {value.data_source_name: value.short_name for value in self.mission_vars}
df = df.rename(columns=column_map)
return df
def _convert_dbd_to_dataframe(self):
"""
Get the dbd data as a dataframe
"""
data, variables_retrieved = self._get_dbd_data()
df = pd.DataFrame(data).T
new_column_names = ['time']
new_column_names.extend(variables_retrieved)
if len(df.columns) != len(new_column_names):
print(f'The number of columns in the dataframe does not match the number of mission variables, {df.columns} vs {new_column_names}')
# Add names to the dataframe columns
df.columns = new_column_names
# Format time
df = self._format_time(df)
# Calculate variables
df = self._calculate_vars(df)
# Set time as index
df = df.set_index('time')
df = self._update_dataframe_columns(df)
return df
def _generate_ds(self):
"""
Generate a xarray dataset from the dataframe
"""
self.logger.info("Generating xarray dataset")
# self.ds = xr.Dataset.from_dataframe(self.df)
self.logger.debug("Merging science and engineering datasets")
self.ds = xr.merge([self.sci_ds, self.eng_ds])
self.logger.info("Created dataset with %d variables and %d coordinates", len(self.ds.data_vars), len(self.ds.coords))
self.logger.debug("Adding global attributes")
self._add_global_attrs()
self.logger.debug("Adding variable attributes")
self._add_variable_attrs()
self.logger.info("Dataset generation complete")
return self.ds
def _get_longitude(self):
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
return self.ds.longitude.values
def _get_latitude(self):
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
return self.ds.latitude.values
def _get_depth(self):
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
return self.ds.depth.values
def _get_time(self):
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
return self.ds.time.values
def _add_global_attrs(self):
if self.wmo_id is None:
self.logger.error("WMO ID is None, cannot add global attributes")
raise ValueError("WMO ID is None, cannot add global attributes")
global_attrs = get_global_attrs(wmo_id = self.wmo_id,mission_title=self.mission_title,
longitude=self._get_longitude(),latitude=self._get_latitude(),
depth=self._get_depth(),time=self._get_time())
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
self.ds.attrs = global_attrs
def _add_variable_attrs(self):
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
for var in self.mission_vars:
self.ds[var.short_name].attrs = var.to_dict()
def _add_gridded_data(self):
'''Add gridded data to the dataset, must be called after adding attrs'''
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
ds_gridded = Gridder(self.ds).create_gridded_dataset()
self.ds.update(ds_gridded)
[docs]
def process(self,return_ds=True):
self.logger.info("=== Starting data processing ===")
start_time = pd.Timestamp.now()
self._generate_ds()
if self.include_gridded_data:
self.logger.info("Adding gridded data to dataset")
self._add_gridded_data()
else:
self.logger.info("Skipping gridded data (disabled)")
processing_time = pd.Timestamp.now() - start_time
self.logger.info("=== Processing complete in %.2f seconds ===", processing_time.total_seconds())
if return_ds:
return self.ds
[docs]
def save(self,save_path=None):
if self.ds is None:
self.logger.info("Dataset not generated yet, running process()")
self.process()
if save_path is None:
save_path = self.netcdf_output_path
self.logger.info("Saving dataset to: %s", save_path)
# Create directory if it doesn't exist
save_path.parent.mkdir(parents=True, exist_ok=True)
start_time = pd.Timestamp.now()
if self.ds is None:
self.logger.error("Dataset not generated yet, run process() first")
raise ValueError("Dataset not generated yet, run process() first")
self.ds.to_netcdf(save_path)
save_time = pd.Timestamp.now() - start_time
file_size_mb = save_path.stat().st_size / (1024 * 1024)
self.logger.info("Dataset saved successfully (%.2f MB) in %.2f seconds", file_size_mb, save_time.total_seconds())
return self.ds