Source code for DiadFit.densimeters

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import patches
import lmfit
from lmfit.models import GaussianModel, VoigtModel, LinearModel, ConstantModel
from scipy.signal import find_peaks
import os
import re
from os import listdir
from os.path import isfile, join
import pickle
from pathlib import Path
from DiadFit.ne_lines import *
from scipy.stats import t
from DiadFit.importing_data_files import *
encode="ISO-8859-1"

## Cornell densimeters
[docs] def calculate_density_cornell_old(*, temp='SupCrit', Split, split_err=None): """ This function converts Diad Splitting into CO$_2$ density using the densimeters of DeVitre et al. (2021) This should only be used for the Cornell Raman, not other Ramans at present. This is an older version of the function calculate_density_cornell - it does not have the same error propagation capabilities, but we keep it here for backwards compatability. Parameters ------------- temp: str 'SupCrit' if measurements done at 37C 'RoomT' if measurements done at 24C Split: int, float, pd.Series, np.array Corrected splitting in cm-1 Split_err: int, float, pd.Series (Optional) Error on corrected splitting in cm-1 Returns -------------- pd.DataFrame Prefered Density (based on different equations being merged), and intermediate calculations """ #if temp is "RoomT": LowD_RT=-38.34631 + 0.3732578*Split HighD_RT=-41.64784 + 0.4058777*Split- 0.1460339*(Split-104.653)**2 # IF temp is 37 LowD_SC=-38.62718 + 0.3760427*Split MedD_SC=-47.2609 + 0.4596005*Split+ 0.0374189*(Split-103.733)**2-0.0187173*(Split-103.733)**3 HighD_SC=-42.52782 + 0.4144277*Split- 0.1514429*(Split-104.566)**2 if isinstance(Split, pd.Series) or isinstance(Split, np.ndarray): df=pd.DataFrame(data={'Preferred D': 0, 'in range': 'Y', 'Corrected_Splitting': Split, 'Notes': 'not in range', 'LowD_RT': LowD_RT, 'HighD_RT': HighD_RT, 'LowD_SC': LowD_SC, 'MedD_SC': MedD_SC, 'HighD_SC': HighD_SC, 'Temperature': temp, 'Splitting': Split, }) else: df=pd.DataFrame(data={'Preferred D': 0, 'in range': 'Y', 'Corrected_Splitting': Split, 'Notes': 'not in range', 'LowD_RT': LowD_RT, 'HighD_RT': HighD_RT, 'LowD_SC': LowD_SC, 'MedD_SC': MedD_SC, 'HighD_SC': HighD_SC, 'Temperature': temp, 'Splitting': Split, }, index=[0]) roomT=df['Temperature']=="RoomT" SupCrit=df['Temperature']=="SupCrit" # If splitting is 0 zero=df['Corrected_Splitting']==0 # Range for SC low density min_lowD_SC_Split=df['Corrected_Splitting']>=102.72 max_lowD_SC_Split=df['Corrected_Splitting']<=103.16 # Range for SC med density min_MD_SC_Split=df['Corrected_Splitting']>103.16 max_MD_SC_Split=df['Corrected_Splitting']<=104.28 # Range for SC high density min_HD_SC_Split=df['Corrected_Splitting']>=104.28 max_HD_SC_Split=df['Corrected_Splitting']<=104.95 # Range for Room T low density min_lowD_RoomT_Split=df['Corrected_Splitting']>=102.734115670188 max_lowD_RoomT_Split=df['Corrected_Splitting']<=103.350311768435 # Range for Room T high density min_HD_RoomT_Split=df['Corrected_Splitting']>=104.407308904012 max_HD_RoomT_Split=df['Corrected_Splitting']<=105.1 # Impossible densities, room T Imposs_lower_end=(df['Corrected_Splitting']>103.350311768435) & (df['Splitting']<103.88) # Impossible densities, room T Imposs_upper_end=(df['Corrected_Splitting']<104.407308904012) & (df['Splitting']>103.88) # Too low density Too_Low_SC=df['Corrected_Splitting']<102.72 Too_Low_RT=df['Corrected_Splitting']<102.734115670188 df.loc[zero, 'Preferred D']=0 df.loc[zero, 'Notes']=pd.NA # If room T, low density, set as low density df.loc[roomT&(min_lowD_RoomT_Split&max_lowD_RoomT_Split), 'Preferred D'] = LowD_RT df.loc[roomT&(min_lowD_RoomT_Split&max_lowD_RoomT_Split), 'Notes']='Room T, low density' # If room T, high density df.loc[roomT&(min_HD_RoomT_Split&max_HD_RoomT_Split), 'Preferred D'] = HighD_RT df.loc[roomT&(min_HD_RoomT_Split&max_HD_RoomT_Split), 'Notes']='Room T, high density' # If SupCrit, high density df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D'] = HighD_SC df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Notes']='SupCrit, high density' # If SupCrit, Med density df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D'] = MedD_SC df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Notes']='SupCrit, Med density' # If SupCrit, low density df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D'] = LowD_SC df.loc[SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Notes']='SupCrit, low density' # If Supcritical, and too low df.loc[SupCrit&(Too_Low_SC), 'Preferred D']=LowD_SC df.loc[SupCrit&(Too_Low_SC), 'Notes']='Below lower calibration limit' df.loc[SupCrit&(Too_Low_SC), 'in range']='N' # If RoomT, and too low df.loc[roomT&(Too_Low_RT), 'Preferred D']=LowD_RT df.loc[roomT&(Too_Low_RT), 'Notes']='Below lower calibration limit' df.loc[roomT&(Too_Low_RT), 'in range']='N' #if splitting is zero SplitZero=df['Corrected_Splitting']==0 df.loc[SupCrit&(SplitZero), 'Preferred D']=np.nan df.loc[SupCrit&(SplitZero), 'Notes']='Splitting=0' df.loc[SupCrit&(SplitZero), 'in range']='N' df.loc[roomT&(SplitZero), 'Preferred D']=np.nan df.loc[roomT&(SplitZero), 'Notes']='Splitting=0' df.loc[roomT&(SplitZero), 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_lower_end, 'Preferred D'] = LowD_RT df.loc[roomT&Imposs_lower_end, 'Notes']='Impossible Density, low density' df.loc[roomT&Imposs_lower_end, 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_upper_end, 'Preferred D'] = HighD_RT df.loc[roomT&Imposs_upper_end, 'Notes']='Impossible Density, high density' df.loc[roomT&Imposs_upper_end, 'in range']='N' #df.loc[zero, 'in range']='Y' # If high densiy, and beyond the upper calibration limit Upper_Cal_RT=df['Corrected_Splitting']>105.1 Upper_Cal_SC=df['Corrected_Splitting']>104.95 df.loc[roomT&Upper_Cal_RT, 'Preferred D'] = HighD_RT df.loc[roomT&Upper_Cal_RT, 'Notes']='Above upper Cali Limit' df.loc[roomT&Upper_Cal_RT, 'in range']='N' df.loc[SupCrit&Upper_Cal_SC, 'Preferred D'] = HighD_SC df.loc[SupCrit&Upper_Cal_SC, 'Notes']='Above upper Cali Limit' df.loc[SupCrit&Upper_Cal_SC, 'in range']='N' if split_err is not None: df2=calculate_dens_error(temp, Split, split_err) df.insert(1, 'dens+1σ', df2['max_dens']) df.insert(1, 'dens-1σ', df2['min_dens']) df.insert(3, '1σ', (df2['max_dens']-df2['min_dens'])/2 ) return df
[docs] def calculate_dens_error(temp, Split, split_err): """ This function is used by the function calculate_density_cornell_old to calculate a max and min density corresponding to splitting values + - 1 sigma Parameters ------------- temp: str 'SupCrit' if measurements done at 37C 'RoomT' if measurements done at 24C Split: int, float, pd.Series, np.array Corrected splitting in cm-1 Split_err: int, float, pd.Series (Optional) Error on corrected splitting in cm-1 Returns --------- pd.DataFrame Dataframe with column for max calculated density and min calculated density. """ max_dens=calculate_density_cornell(temp=temp, Split=Split+split_err) min_dens=calculate_density_cornell(temp=temp, Split=Split-split_err) df=pd.DataFrame(data={ 'max_dens': max_dens['Preferred D'], 'min_dens': min_dens['Preferred D']}) return df
[docs] def propagate_error_split_neon_peakfit(*, df_fits, Ne_corr=None, Ne_err=None, pref_Ne=None): """ This function propagates errors in your Ne correction model and peak fits by quadrature. Parameters ----------------- df_fits: pd.DataFrame Dataframe of peak fitting parameters. Must contain columns for 'Diad1_cent_err', 'Diad2_cent_err', 'Splitting' Choose either: Ne_corr: pd.DataFrame (Optional) Dataframe with columns for 'upper_values' and 'lower values', e.g. the upper and lower bounds of the error on the Ne correction model Or pref_Ne and Ne_err: float, int, pd.Series, np.array A preferred value of the Ne correction factor and the error (e.g. pref_Ne=0.998, Ne_err=0.001). Used for rapid peak fitting before developing Ne lines. Returns ----------------- two pd.Series, the error on the splitting, and the combined error from the splitting and the Ne correction model. """ # Get the error on Neon things if isinstance(Ne_corr, pd.DataFrame): Ne_err=(Ne_corr['upper_values']-Ne_corr['lower_values'])/2 print(np.mean(Ne_err)) pref_Ne=Ne_corr['preferred_values'] elif pref_Ne is not None and Ne_err is not None: print('using fixed values for Ne error and Ne factor') else: raise TypeError('you either ne Ne_corr as a dataframe, or to give a value for pref_Ne and Ne_err') # Get the peak fit errors Diad1_err=df_fits['Diad1_cent_err'].fillna(0).infer_objects() Diad2_err=df_fits['Diad2_cent_err'].fillna(0).infer_objects() split_err=(Diad1_err**2 + Diad2_err**2)**0.5 Combo_err= (((df_fits['Splitting']* (Ne_err))**2) + (pref_Ne *split_err )**2 )**0.5 return Combo_err, split_err
## Error for densimeters
[docs] def calculate_Densimeter_std_err_values(*, pickle_str, corrected_split, corrected_split_err, CI_dens=0.67, CI_split=0.67, str_d='LowD') : """ This function propagates uncertainty from the densimeter polynomial fit and the overall error on the corrected splitting (from both the peak fitting and the Ne line correction). Parameters ----------------- pickle_str: str Name of Pickle with regression model for a specific part of the densimeter. Need to be in the same path as the notebook you are calling this in. corrected_split: pd.Series panda series of corrected splitting (cm-1) corrected_split_err: pd. Series panda series of error on corrected splitting (contributions from both peak fitting and the Ne correction model if relevant) str_d: str string of what density equation it came from, appended onto column headings. CI_split: float confidence interval for splitting propagation. Should be set the same as CI_ CI_split: float Returns ----------------- pd.DataFrame Dataframe of preferred density, and error from each source of uncertainty (Density_σ_dens=error from densimeter, Density_σ_split = error from spliting). """ # Corrected splitting new_x=corrected_split new_x_uncertainty=corrected_split_err # This gets the uncertainty from the densimeter DiadFit_dir=Path(__file__).parent with open(DiadFit_dir/pickle_str, 'rb') as f: data = pickle.load(f) model = data['model'] N_poly = model.order - 1 Pf = data['model'] x = data['x'] y = data['y'] # Calculate the residuals residuals = y - Pf(x) # Calculate the standard deviation of the residuals residual_std = np.std(residuals) # Calculate the standard errors for the new x values mean_x = np.nanmean(x) n = len(x) standard_errors = residual_std * np.sqrt(1 + 1 / n + (new_x - mean_x) ** 2 / np.sum((x - mean_x) ** 2)) # Calculate the degrees of freedom df = len(x) - (N_poly + 1) # Calculate the t value for the given confidence level t_value_split = t.ppf((1 + CI_split) / 2, df) t_value_dens = t.ppf((1 + CI_dens) / 2, df) # Calculate the prediction intervals from the densimeter preferred_values = Pf(new_x) lower_values = preferred_values - t_value_dens * standard_errors upper_values = preferred_values + t_value_dens * standard_errors uncertainty_from_dens=(upper_values-lower_values)/2 # Calculate the propagated uncertainty in splitting max_split=new_x + new_x_uncertainty min_split=new_x - new_x_uncertainty max_density= Pf(max_split) min_density= Pf(min_split) uncertainty_split=(max_density-min_density)/2 # Calculate the total uncertainty in the density estimation total_uncertainty = np.sqrt(uncertainty_split ** 2 + uncertainty_from_dens ** 2) df=pd.DataFrame(data={ str_d+'_Density': preferred_values, str_d + '_Density_σ': total_uncertainty, str_d+'_Density+1σ': preferred_values-total_uncertainty, str_d+'_Density-1σ': preferred_values+total_uncertainty, str_d+'_Density_σ_dens': (uncertainty_from_dens), str_d+'_Density_σ_split': (uncertainty_split), }) return df
## Function for if we dont have a densimeter yet
[docs] def calculate_errors_no_densimeter(*, df_combo, Ne_pickle_str='polyfit_data.pkl', temp='SupCrit', split_err=0, CI_split=0.67, CI_neon=0.67): """ This function calculates the error from just the Ne line correction method. It is largely redundant, still in use for some older projects using an old workflow """ time=df_combo['sec since midnight'] Ne_corr=calculate_Ne_corr_std_err_values(pickle_str=Ne_pickle_str, new_x=time, CI=CI_neon) # Lets calculate corrected splitting and the error on this. Split=df_combo['Splitting']*Ne_corr['preferred_values'] df_combo['Corrected_Splitting']=Split Split_err, pk_err=propagate_error_split_neon_peakfit(Ne_corr=Ne_corr, df_fits=df_combo) df_combo['Corrected_Splitting_σ']=Split_err df_combo['Corrected_Splitting_σ_Ne']=(Ne_corr['upper_values']*df_combo['Splitting']-Ne_corr['lower_values']*df_combo['Splitting'])/2 df_combo['Corrected_Splitting_σ_peak_fit']=pk_err cols_to_move = ['filename', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit'] df_merge = df_combo[cols_to_move + [ col for col in df_combo.columns if col not in cols_to_move]] return df_combo
[docs] def calculate_density_cornell(*, lab='CMASS', df_combo=None, temp='SupCrit', CI_split=0.67, CI_neon=0.67, Ne_pickle_str=None, pref_Ne=None, Ne_err=None, corrected_split=None, split_err=None): """ This function converts Diad Splitting into CO$_2$ density using the Cornell densimeters. Use lab='CCMR' for CCMR and lab='CMASS' for Esteban Gazels lab. Parameters ------------- lab: str. 'CMASS' or 'CCMR' Name of the lab where the analy Either: df_combo: pandas DataFrame data frame of peak fitting information Or: corrected_split: pd.Series Corrected splitting (cm-1) Split_err: float, int Error on corrected splitting temp: str 'SupCrit' if measurements done at 37C 'RoomT' if measurements done at 24C - Not supported yet but could be added if needed. CI_neon: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. CI_split: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. Either Ne_pickle_str: str Name of Ne correction model OR pref_Ne, Ne_err: float, int For quick and dirty fitting can pass a preferred value for your instrument before you have a chance to regress the Ne lines (useful when first analysing new samples. ) Returns -------------- pd.DataFrame Prefered Density (based on different equatoins being merged), and intermediate calculations """ if corrected_split is not None: Split=corrected_split if df_combo is not None: df_combo_c=df_combo.copy() time=df_combo_c['sec since midnight'] if Ne_pickle_str is not None: # Calculating the upper and lower values for Ne to get that error Ne_corr=calculate_Ne_corr_std_err_values(pickle_str=Ne_pickle_str, new_x=time, CI=CI_neon) # Extracting preferred correction values pref_Ne=Ne_corr['preferred_values'] Split_err, pk_err=propagate_error_split_neon_peakfit(Ne_corr=Ne_corr, df_fits=df_combo_c) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=(Ne_corr['upper_values']*df_combo_c['Splitting']-Ne_corr['lower_values']*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err # If using a single value for quick dirty fitting else: Split_err, pk_err=propagate_error_split_neon_peakfit(df_fits=df_combo_c, Ne_err=Ne_err, pref_Ne=pref_Ne) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=((Ne_err+pref_Ne)*df_combo_c['Splitting']-(Ne_err-pref_Ne)*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err Split=df_combo_c['Splitting']*pref_Ne else: Split_err=(split_err*Split).astype(float) if isinstance(Split, float) or isinstance(Split, int): Split=pd.Series(Split) # #if temp is "RoomT": DiadFit_dir=Path(__file__).parent LowD_RT=-38.34631 + 0.3732578*Split HighD_RT=-41.64784 + 0.4058777*Split- 0.1460339*(Split-104.653)**2 # IF temp is 37 if lab=='CMASS' and temp=='SupCrit': print('yes') # This gets the densimeter at low density pickle_str_lowr='Lowrho_polyfit_data_CMASS.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_data_CMASS.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_data_CMASS.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) elif lab=='CMASS' and temp=='RoomT': # This gets the densimeter at low density pickle_str_lowr='Lowrho_polyfit_data_CMASS_24C.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_data_CMASS_24C.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) elif lab=='CCMR' and temp=='SupCrit': pickle_str_lowr='Lowrho_polyfit_data_CCMR.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_data_CCMR.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_data_CCMR.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) else: raise TypeError('Lab name not recognised. enter CCMR SupCrit, CMASS SupCrit, CMASS roomT (CCMR room T can be added on request to Penny)') # this allocates the model lowrho_model = lowrho_pickle_data['model'] if temp=='SupCrit': medrho_model = medrho_pickle_data['model'] MedD_SC = pd.Series(medrho_model(Split), index=Split.index) medD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_medr, CI_dens=CI_neon, CI_split=CI_split, str_d='MedD') highrho_model = highrho_pickle_data['model'] # Each of these lines get the density, and then the error on that density. LowD_SC = pd.Series(lowrho_model(Split), index=Split.index) lowD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_lowr, CI_dens=CI_neon, CI_split=CI_split, str_d='LowD') HighD_SC = pd.Series(highrho_model(Split), index=Split.index) highD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_highr, CI_dens=CI_neon, CI_split=CI_split, str_d='HighD') if temp=='RoomT': MedD_SC=np.nan MedD_err=np.nan else: MedD_err=medD_error['MedD_Density_σ'] df=pd.DataFrame(data={'Preferred D': 0, 'Corrected_Splitting': Split, 'Preferred D_σ': 0, 'Preferred D_σ_split': 0, 'Preferred D_σ_Ne': 0, 'Preferred D_σ_pkfit': 0, 'Preferred D_σ_dens': 0, 'in range': 'Y', 'Notes': 'not in range', 'LowD_RT':np.nan, 'HighD_RT': np.nan, 'LowD_SC': LowD_SC, 'LowD_SC_σ': lowD_error['LowD_Density_σ'], 'MedD_SC': MedD_SC, 'MedD_SC_σ': MedD_err, 'HighD_SC': HighD_SC, 'HighD_SC_σ': highD_error['HighD_Density_σ'], 'Temperature': temp, }) roomT=df['Temperature']=="RoomT" SupCrit=df['Temperature']=="SupCrit" # If splitting is 0 zero=df['Corrected_Splitting']==0 # Cut off values ------------------------------------------- # Range for SC low density min_lowD_SC_Split=df['Corrected_Splitting']>=102.7623598753032 max_lowD_SC_Split=df['Corrected_Splitting']<=103.1741034592534 # Range for SC med density min_MD_SC_Split=df['Corrected_Splitting']>103.0608505403591 max_MD_SC_Split=df['Corrected_Splitting']<=104.3836704771313 # Range for SC high density min_HD_SC_Split=df['Corrected_Splitting']>=104.2538992302499 max_HD_SC_Split=df['Corrected_Splitting']<=105.3438707618937 # Range for Room T low density min_lowD_RoomT_Split=df['Corrected_Splitting']>=102.734115670188 max_lowD_RoomT_Split=df['Corrected_Splitting']<=103.350311768435 # Range for Room T high density min_HD_RoomT_Split=df['Corrected_Splitting']>=104.407308904012 max_HD_RoomT_Split=df['Corrected_Splitting']<=105.1 # Impossible densities, room T Imposs_lower_end=(df['Corrected_Splitting']>103.350311768435) & (df['Corrected_Splitting']<103.88) # Impossible densities, room T Imposs_upper_end=(df['Corrected_Splitting']<104.407308904012) & (df['Corrected_Splitting']>103.88) # Too low density Too_Low_SC=df['Corrected_Splitting']<102.7623598753032 Too_Low_RT=df['Corrected_Splitting']<102.734115670188 df.loc[zero, 'Preferred D']=0 df.loc[zero, 'Notes']=pd.NA # Get rid of pandas 2 issue with warning of setting item of incompatible dtype df['Preferred D'] = df['Preferred D'].astype('float64', errors='ignore') df['Preferred D_σ'] = df['Preferred D_σ'].astype('float64', errors='ignore') df['Preferred D_σ_split'] = df['Preferred D_σ_split'].astype('float64', errors='ignore') df['Preferred D_σ_dens'] = df['Preferred D_σ_dens'].astype('float64', errors='ignore') # If SupCrit, high density df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D'] = HighD_SC df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ'] = highD_error['HighD_Density_σ'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_split'] = highD_error['HighD_Density_σ_split'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_dens'] = highD_error['HighD_Density_σ_dens'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Notes']='SupCrit, high density' if temp!='RoomT': df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D'] = MedD_SC df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ'] = medD_error['MedD_Density_σ'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ_split'] = medD_error['MedD_Density_σ_split'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ_dens'] = medD_error['MedD_Density_σ_dens'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Notes']='SupCrit, Med density' # If SupCrit, low density df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D'] = LowD_SC df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ'] = lowD_error['LowD_Density_σ'] df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_split'] = lowD_error['LowD_Density_σ_split'] df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_dens'] = lowD_error['LowD_Density_σ_dens'] df.loc[SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Notes']='SupCrit, low density' # If Supcritical, and too low df.loc[SupCrit&(Too_Low_SC), 'Preferred D']=LowD_SC df.loc[SupCrit&(Too_Low_SC), 'Notes']='Below lower calibration limit' df.loc[SupCrit&(Too_Low_SC), 'in range']='N' # now lets do Room T --------------------------------- df.loc[ roomT&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D'] = HighD_SC df.loc[ roomT&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ'] = highD_error['HighD_Density_σ'] df.loc[ roomT&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_split'] = highD_error['HighD_Density_σ_split'] df.loc[ roomT&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_dens'] = highD_error['HighD_Density_σ_dens'] df.loc[ roomT&(min_HD_SC_Split&max_HD_SC_Split), 'Notes']='roomT, high density' # If roomT, low density df.loc[ roomT&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D'] = LowD_SC df.loc[ roomT&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ'] = lowD_error['LowD_Density_σ'] df.loc[ roomT&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_split'] = lowD_error['LowD_Density_σ_split'] df.loc[ roomT&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_dens'] = lowD_error['LowD_Density_σ_dens'] df.loc[roomT&(min_lowD_SC_Split&max_lowD_SC_Split), 'Notes']='roomT, low density' # If RoomT, and too low df.loc[roomT&(Too_Low_RT), 'Preferred D']=LowD_RT df.loc[roomT&(Too_Low_RT), 'Notes']='Below lower calibration limit' df.loc[roomT&(Too_Low_RT), 'in range']='N' #if splitting is zero SplitZero=df['Corrected_Splitting']==0 df.loc[SupCrit&(SplitZero), 'Preferred D']=np.nan df.loc[SupCrit&(SplitZero), 'Notes']='Splitting=0' df.loc[SupCrit&(SplitZero), 'in range']='N' df.loc[roomT&(SplitZero), 'Preferred D']=np.nan df.loc[roomT&(SplitZero), 'Notes']='Splitting=0' df.loc[roomT&(SplitZero), 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_lower_end, 'Preferred D'] = LowD_RT df.loc[roomT&Imposs_lower_end, 'Notes']='Impossible Density, low density' df.loc[roomT&Imposs_lower_end, 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_upper_end, 'Preferred D'] = HighD_RT df.loc[roomT&Imposs_upper_end, 'Notes']='Impossible Density, high density' df.loc[roomT&Imposs_upper_end, 'in range']='N' #df.loc[zero, 'in range']='Y' # If high densiy, and beyond the upper calibration limit Upper_Cal_RT=df['Corrected_Splitting']>105.1 Upper_Cal_SC=df['Corrected_Splitting']>105.3438707618937 df.loc[roomT&Upper_Cal_RT, 'Preferred D'] = HighD_RT df.loc[roomT&Upper_Cal_RT, 'Notes']='Above upper Cali Limit' df.loc[roomT&Upper_Cal_RT, 'in range']='N' df.loc[SupCrit&Upper_Cal_SC, 'Preferred D'] = HighD_SC df.loc[SupCrit&Upper_Cal_SC, 'Notes']='Above upper Cali Limit' df.loc[SupCrit&Upper_Cal_SC, 'in range']='N' # if Ne_pickle_str is not None: # # df_merge1=pd.concat([df_combo_c, Ne_corr], axis=1).reset_index(drop=True) # else: # df_merge1=df # print('df') # print(df['Preferred D']) # print('df_merge1') # print(df_merge1['Preferred D']) if Ne_pickle_str is not None: df_merge1=pd.concat([df_combo_c, Ne_corr], axis=1).reset_index(drop=True) df_merge=pd.concat([df, df_merge1], axis=1).reset_index(drop=True) elif Ne_pickle_str is None and df_combo is not None: df_merge=pd.concat([df, df_combo_c], axis=1).reset_index(drop=True) else: df_merge=df #df_merge=pd.concat([df, df_merge1], axis=1).reset_index(drop=True) df_merge = df_merge.rename(columns={'Preferred D': 'Density g/cm3'}) df_merge = df_merge.rename(columns={'Preferred D_σ': 'σ Density g/cm3'}) df_merge = df_merge.rename(columns={'Preferred D_σ_split': 'σ Density g/cm3 (from Ne+peakfit)'}) df_merge = df_merge.rename(columns={'Preferred D_σ_dens': 'σ Density g/cm3 (from densimeter)'}) df_merge = df_merge.rename(columns={'filename_x': 'filename'}) if Ne_pickle_str is not None: # If its not none, have all the columns for Ne cols_to_move = ['filename', 'Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit', 'power (mW)', 'Spectral Center'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] # If pref Ne is not none and you dont have a dataframe elif pref_Ne is not None and df_combo is not None: #If Pref Ne, cols_to_move = ['filename', 'Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] elif df_combo is None: cols_to_move = ['Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] return df_merge
## functions to neatly merge secondary phases in import os.path from os import path
[docs] def merge_in_carb_SO2(df_combo, file1_name='Carb_Peak_fits.xlsx', file2_name='SO2_Peak_fits.xlsx', prefix=False, str_prefix=" ", file_ext='.txt'): """ This function checks for .xlsx files with secondary phases in the path with names file1_name and file2_name, if they are there it will merge them together into a dataframe. It then merges this with the dataframe 'df_combo' of your other fits Parameters ----------------- df_combo: pd.DataFrame Dataframe of peak fitting parameters for diads file1_name: str Name of first excel spreadsheet of secondary phases to merge in file2_name: str Name of second excel spreadsheet of secondary phases to merge in prefix: bool If True, removes prefix followed by str_prefix from file name, e.g. if your spectra are called 01 SO2_acquisition, the file name would become SO2_acquisition for easier merging. file_ext: str Removes from file name for merging dataframes (as above). Returns ---------------- pd.DataFrame Dataframe of combined peak parameters from secondary phases and diads. """ if path.exists(file1_name): Carb=pd.read_excel(file1_name) else: Carb=None if path.exists(file2_name): SO2=pd.read_excel(file2_name) else: SO2=None if SO2 is not None and Carb is not None: Sec_Phases=pd.merge(SO2, Carb, on='filename', how='outer').reset_index(drop=True) elif SO2 is not None and Carb is None: Sec_Phases=SO2 elif SO2 is None and Carb is not None: Sec_Phases=Carb else: Sec_Phases=None if Sec_Phases is not None: print('Made a df!') # Remove these to get the pure file name if Sec_Phases is not None: file_sec_phase=extracting_filenames_generic( prefix=prefix, str_prefix=str_prefix, file_ext=file_ext, names=Sec_Phases['filename'].reset_index(drop=True)) else: df_combo_sec_phase=df_combo if Sec_Phases is not None: Sec_Phases['filename']=file_sec_phase df_combo_sec_phase=df_combo.merge(Sec_Phases, on='filename', how='outer').reset_index(drop=True) else: df_combo_sec_phase=df_combo if 'Peak_Area_Carb' in df_combo_sec_phase.columns: df_combo_sec_phase['Carb_Diad_Ratio']=(df_combo_sec_phase['Peak_Area_Carb']/(df_combo_sec_phase['Diad1_Voigt_Area'] +df_combo_sec_phase['Diad2_Voigt_Area'])) if 'Peak_Area_SO2' in df_combo_sec_phase.columns: df_combo_sec_phase['SO2_Diad_Ratio']=(df_combo_sec_phase['Peak_Area_SO2']/(df_combo_sec_phase['Diad1_Voigt_Area'] +df_combo_sec_phase['Diad2_Voigt_Area'])) return df_combo_sec_phase
## Merge peak fits together
[docs] def merge_fit_files(path): """ This function merges the files Discarded_df.xlsx, Weak_Diads.xlsx, Medium_Diads.xlsx, Strong_Diads.xlsx if they exist into one combined dataframe. Parameters ----------------- path: str Path on computer where these files are stored Returns ------------ pd.DataFrame pandas dataframe where 3 sets of peak fits are merged. """ import os import pandas as pd if os.path.exists(os.path.join(path, 'Discarded_df.xlsx')): discard = pd.read_excel(os.path.join(path, 'Discarded_df.xlsx')) else: discard = None if os.path.exists(os.path.join(path, 'Weak_Diads.xlsx')): grp1 = pd.read_excel(os.path.join(path, 'Weak_Diads.xlsx')) grp1['Standard']='No' else: grp1 = None if os.path.exists(os.path.join(path, 'Medium_Diads.xlsx')): grp2 = pd.read_excel(os.path.join(path, 'Medium_Diads.xlsx')) grp2['Standard']='No' else: grp2 = None if os.path.exists(os.path.join(path, 'Strong_Diads.xlsx')): grp3 = pd.read_excel(os.path.join(path, 'Strong_Diads.xlsx')) grp3['Standard']='No' else: grp3 = None if os.path.exists(os.path.join(path, 'Std_Diads.xlsx')): grp4 = pd.read_excel(os.path.join(path, 'Std_Diads.xlsx')) grp4['Standard']='Yes' else: grp4 = None df2 = pd.concat([grp1, grp2, grp3, grp4], axis=0).reset_index(drop=True) if discard is not None: discard_cols=discard[discard.columns.intersection(df2.columns)] df2=pd.concat([df2, discard_cols]).reset_index(drop=True) return df2
## New UC Berkeley using 1220
[docs] def calculate_density_ucb(*, Ne_line_combo='1117_1447', df_combo=None, temp='SupCrit', CI_split=0.67, CI_neon=0.67, Ne_pickle_str=None, Ar_pickle_str=None, pref_Ne=None, Ne_err=None, corrected_split=None, split_err=None): """ This function converts Diad Splitting into CO$_2$ density using the UC Berkeley calibration line developed by DeVitre and Wieser in 2023. Parameters ------------- Ne_line_combo: str, '1117_1447', '1117_1400', '1220_1447', '1220_1400', '1220_1567' Combination of Ne lines used for drift correction Either: df_combo: pandas DataFrame data frame of peak fitting information Or: corrected_split: pd.Series Corrected splitting (cm-1) split_err: float, int Error on corrected splitting temp: str 'SupCrit' if measurements done at 37C 'RoomT' if measurements done at 24C - Not supported yet but could be added if needed. CI_neon: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. CI_split: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. Either Ne_pickle_str: str Name of Ne correction model OR pref_Ne, Ne_err: float, int For quick and dirty fitting can pass a preferred value for your instrument before you have a chance to regress the Ne lines (useful when first analysing new samples. ) Returns -------------- pd.DataFrame Prefered Density (based on different equatoins being merged), and intermediate calculations """ if corrected_split is not None: Split=corrected_split if df_combo is not None: df_combo_c=df_combo.copy() time=df_combo_c['sec since midnight'] if Ne_pickle_str is not None: # Calculating the upper and lower values for Ne to get that error Ne_corr=calculate_Ne_corr_std_err_values(pickle_str=Ne_pickle_str, new_x=time, CI=CI_neon) # Extracting preferred correction values pref_Ne=Ne_corr['preferred_values'] Split_err, pk_err=propagate_error_split_neon_peakfit(Ne_corr=Ne_corr, df_fits=df_combo_c) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=(Ne_corr['upper_values']*df_combo_c['Splitting']-Ne_corr['lower_values']*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err # If using a single value for quick dirty fitting else: Split_err, pk_err=propagate_error_split_neon_peakfit(df_fits=df_combo_c, Ne_err=Ne_err, pref_Ne=pref_Ne) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=((Ne_err+pref_Ne)*df_combo_c['Splitting']-(Ne_err-pref_Ne)*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err Split=df_combo_c['Splitting']*pref_Ne else: Split_err=split_err # This is for if you just have splitting # This propgates the uncertainty in the splitting from peak fitting, and the Ne correction model if temp=='RoomT': raise TypeError('Sorry, no UC Berkeley calibration at 24C, please enter temp=SupCrit') if isinstance(Split, float) or isinstance(Split, int): Split=pd.Series(Split) # #if temp is "RoomT": DiadFit_dir=Path(__file__).parent LowD_RT=-38.34631 + 0.3732578*Split HighD_RT=-41.64784 + 0.4058777*Split- 0.1460339*(Split-104.653)**2 # IF temp is 37 if Ne_line_combo=='1220_1447': # This gets the densimeter at low density pickle_str_lowr='Lowrho_polyfit_dataUCB_1220_1447.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_dataUCB_1220_1447.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_dataUCB_1220_1447.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) if Ne_line_combo=='1220_1400': pickle_str_lowr='Lowrho_polyfit_dataUCB_1220_1400.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_dataUCB_1220_1400.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_dataUCB_1220_1400.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) if Ne_line_combo=='1220_1567': pickle_str_lowr='Lowrho_polyfit_dataUCB_1220_1567.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_dataUCB_1220_1567.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_dataUCB_1220_1567.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) if Ne_line_combo=='1117_1400': pickle_str_lowr='Lowrho_polyfit_dataUCB_1117_1400.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_dataUCB_1117_1400.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_dataUCB_1117_1400.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) if Ne_line_combo=='1117_1447': # This gets the densimeter at low density pickle_str_lowr='Lowrho_polyfit_data.pkl' with open(DiadFit_dir/pickle_str_lowr, 'rb') as f: lowrho_pickle_data = pickle.load(f) # This gets the densimeter at medium density pickle_str_medr='Mediumrho_polyfit_data.pkl' with open(DiadFit_dir/pickle_str_medr, 'rb') as f: medrho_pickle_data = pickle.load(f) # This gets the densimeter at high density. pickle_str_highr='Highrho_polyfit_data.pkl' with open(DiadFit_dir/pickle_str_highr, 'rb') as f: highrho_pickle_data = pickle.load(f) # this allocates the model lowrho_model = lowrho_pickle_data['model'] medrho_model = medrho_pickle_data['model'] highrho_model = highrho_pickle_data['model'] # Each of these lines get the density, and then the error on that density. LowD_SC = pd.Series(lowrho_model(Split), index=Split.index) lowD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_lowr, CI_dens=CI_neon, CI_split=CI_split, str_d='LowD') MedD_SC = pd.Series(medrho_model(Split), index=Split.index) medD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_medr, CI_dens=CI_neon, CI_split=CI_split, str_d='MedD') HighD_SC = pd.Series(highrho_model(Split), index=Split.index) highD_error=calculate_Densimeter_std_err_values(corrected_split=Split, corrected_split_err=Split_err, pickle_str=pickle_str_highr, CI_dens=CI_neon, CI_split=CI_split, str_d='HighD') df=pd.DataFrame(data={'Preferred D': 0, 'Corrected_Splitting': Split, 'Preferred D_σ': 0, 'Preferred D_σ_split': 0, 'Preferred D_σ_Ne': 0, 'Preferred D_σ_pkfit': 0, 'Preferred D_σ_dens': 0, 'in range': 'Y', 'Notes': 'not in range', 'LowD_RT':np.nan, 'HighD_RT': np.nan, 'LowD_SC': LowD_SC, 'LowD_SC_σ': lowD_error['LowD_Density_σ'], 'MedD_SC': MedD_SC, 'MedD_SC_σ': medD_error['MedD_Density_σ'], 'HighD_SC': HighD_SC, 'HighD_SC_σ': highD_error['HighD_Density_σ'], 'Temperature': temp, }) roomT=df['Temperature']=="RoomT" SupCrit=df['Temperature']=="SupCrit" # If splitting is 0 zero=df['Corrected_Splitting']==0 offset=0 if Ne_line_combo=='1220_1400': offset=105.257-105.3438707618937 min_lowD_SC_Split=df['Corrected_Splitting']>=102.73# If we dont want negative densities, set this to tihs exactly. 102.75025552873848 ##102.7623598753032+offset max_lowD_SC_Split=df['Corrected_Splitting']<=103.1741034592534+offset # Range for SC med density min_MD_SC_Split=df['Corrected_Splitting']>103.0608505403591+offset max_MD_SC_Split=df['Corrected_Splitting']<=104.3836704771313+offset # Range for SC high density min_HD_SC_Split=df['Corrected_Splitting']>=104.2538992302499+offset max_HD_SC_Split=df['Corrected_Splitting']<=105.3438707618937+offset Too_Low_SC=df['Corrected_Splitting']<102.72+offset Too_Low_RT=df['Corrected_Splitting']<102.734115670188+offset Imposs_lower_end=(df['Corrected_Splitting']>103.350311768435+offset) # & (df['Splitting']<103.88+offset) # Impossible densities, room T Imposs_upper_end=(df['Corrected_Splitting']<105.3438707618937+offset)# & (df['Splitting']>103.88+offset) df.loc[zero, 'Preferred D']=0 df.loc[zero, 'Notes']=pd.NA # Assign to the right type to avoid annoying pandas 2 warning # Ensure the columns are of type float64 df['Preferred D'] = df['Preferred D'].astype('float64', errors='ignore') df['Preferred D_σ'] = df['Preferred D_σ'].astype('float64', errors='ignore') df['Preferred D_σ_split'] = df['Preferred D_σ_split'].astype('float64', errors='ignore') df['Preferred D_σ_dens'] = df['Preferred D_σ_dens'].astype('float64', errors='ignore') # If SupCrit, high density df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D'] = HighD_SC df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ'] = highD_error['HighD_Density_σ'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_split'] = highD_error['HighD_Density_σ_split'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Preferred D_σ_dens'] = highD_error['HighD_Density_σ_dens'] df.loc[ SupCrit&(min_HD_SC_Split&max_HD_SC_Split), 'Notes']='SupCrit, high density' # If SupCrit, Med density df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D'] = MedD_SC df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ'] = medD_error['MedD_Density_σ'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ_split'] = medD_error['MedD_Density_σ_split'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Preferred D_σ_dens'] = medD_error['MedD_Density_σ_dens'] df.loc[SupCrit&(min_MD_SC_Split&max_MD_SC_Split), 'Notes']='SupCrit, Med density' # If SupCrit, low density df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D'] = LowD_SC df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ'] = lowD_error['LowD_Density_σ'] df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_split'] = lowD_error['LowD_Density_σ_split'] df.loc[ SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Preferred D_σ_dens'] = lowD_error['LowD_Density_σ_dens'] df.loc[SupCrit&(min_lowD_SC_Split&max_lowD_SC_Split), 'Notes']='SupCrit, low density' # If Supcritical, and too low df.loc[SupCrit&(Too_Low_SC), 'Preferred D']=LowD_SC df.loc[SupCrit&(Too_Low_SC), 'Notes']='Below lower calibration limit' df.loc[SupCrit&(Too_Low_SC), 'in range']='N' # If RoomT, and too low df.loc[roomT&(Too_Low_RT), 'Preferred D']=LowD_RT df.loc[roomT&(Too_Low_RT), 'Notes']='Below lower calibration limit' df.loc[roomT&(Too_Low_RT), 'in range']='N' #if splitting is zero SplitZero=df['Corrected_Splitting']==0 df.loc[SupCrit&(SplitZero), 'Preferred D']=np.nan df.loc[SupCrit&(SplitZero), 'Notes']='Splitting=0' df.loc[SupCrit&(SplitZero), 'in range']='N' df.loc[roomT&(SplitZero), 'Preferred D']=np.nan df.loc[roomT&(SplitZero), 'Notes']='Splitting=0' df.loc[roomT&(SplitZero), 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_lower_end, 'Preferred D'] = LowD_RT df.loc[roomT&Imposs_lower_end, 'Notes']='Impossible Density, low density' df.loc[roomT&Imposs_lower_end, 'in range']='N' # If impossible density, lower end df.loc[roomT&Imposs_upper_end, 'Preferred D'] = HighD_RT df.loc[roomT&Imposs_upper_end, 'Notes']='Impossible Density, high density' df.loc[roomT&Imposs_upper_end, 'in range']='N' #df.loc[zero, 'in range']='Y' # If high densiy, and beyond the upper calibration limit Upper_Cal_RT=df['Corrected_Splitting']>105.1 Upper_Cal_SC=df['Corrected_Splitting']>105.3438707618937 df.loc[roomT&Upper_Cal_RT, 'Preferred D'] = HighD_RT df.loc[roomT&Upper_Cal_RT, 'Notes']='Above upper Cali Limit' df.loc[roomT&Upper_Cal_RT, 'in range']='N' df.loc[SupCrit&Upper_Cal_SC, 'Preferred D'] = HighD_SC df.loc[SupCrit&Upper_Cal_SC, 'Notes']='Above upper Cali Limit' df.loc[SupCrit&Upper_Cal_SC, 'in range']='N' if Ne_pickle_str is not None: df_merge1=pd.concat([df_combo_c, Ne_corr], axis=1).reset_index(drop=True) df_merge=pd.concat([df, df_merge1], axis=1).reset_index(drop=True) elif Ne_pickle_str is None and df_combo is not None: df_merge=pd.concat([df, df_combo_c], axis=1).reset_index(drop=True) else: df_merge=df df_merge = df_merge.rename(columns={'Preferred D': 'Density g/cm3'}) df_merge = df_merge.rename(columns={'Preferred D_σ': 'σ Density g/cm3'}) df_merge = df_merge.rename(columns={'Preferred D_σ_split': 'σ Density g/cm3 (from Ne+peakfit)'}) df_merge = df_merge.rename(columns={'Preferred D_σ_dens': 'σ Density g/cm3 (from densimeter)'}) df_merge = df_merge.rename(columns={'filename_x': 'filename'}) # NE or Ar. if Ne_pickle_str is not None: cols_to_move = [ 'filename', 'Density g/cm3', 'σ Density g/cm3', 'σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit', 'power (mW)', 'Spectral Center' ] elif Ar_pickle_str is not None: cols_to_move = [ 'filename', 'Density g/cm3', 'σ Density g/cm3', 'σ Density g/cm3 (from Ar+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ar', 'Corrected_Splitting_σ_peak_fit', 'power (mW)', 'Spectral Center' ] elif pref_Ne is not None and df_combo is not None: cols_to_move = [ 'filename', 'Density g/cm3', 'σ Density g/cm3', 'σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit' ] elif df_combo is None: cols_to_move = ['Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting'] # Move only existing columns to the front cols_existing = [col for col in cols_to_move if col in df_merge.columns] df_merge = df_merge[cols_existing + [col for col in df_merge.columns if col not in cols_existing]] return df_merge
## Method from FLuids laboratory from FRANCIS Program
[docs] def Francis_pureCO2(FDS, FDS_std, uncer_FDS, uncer_FDS_std=0): """ Returns density using a densimeter made from a single CO2 standard """ offset= 0.035089020233933815 # Calculated FDS at 0.01 g/cm3 FDS_normalized_1=(FDS - FDS_std) + (uncer_FDS**2 + uncer_FDS_std**2)**0.5 + offset FDS_normalized=(FDS - FDS_std) + offset FDS_normalized_2=(FDS - FDS_std) - (uncer_FDS**2 + uncer_FDS_std**2)**0.5 + offset p0= 0 p1= 148.73 p2= 20.946 p3= -180.85 p4= 96.503 p5= -9.8157 d0= 0 d1= +0.31273 d2= +0.11155 d3= -0.01843 d4= -0.0044 d5= 0 pressure1 = p5*FDS_normalized_1**5 + p4*FDS_normalized_1**4 + p3*FDS_normalized_1**3 + p2*FDS_normalized_1**2 +p1*FDS_normalized_1**1 + p0 pressure2 = p5*FDS_normalized_2**5 + p4*FDS_normalized_2**4 + p3*FDS_normalized_2**3 + p2*FDS_normalized_2**2 +p1*FDS_normalized_2**1 + p0 pressure_final = (pressure1 + pressure2)/2 uncer_pressure_final = 8.7 + (np.maximum(pressure1, pressure2) - np.minimum(pressure1, pressure2))/(2*np.sqrt(3)) #uncertainty=8 density1 = d5*FDS_normalized_1**5 + d4*FDS_normalized_1**4 + d3*FDS_normalized_1**3 + d2*FDS_normalized_1**2 + d1*FDS_normalized_1**1 + d0 density2 = d5*FDS_normalized_2**5 + d4*FDS_normalized_2**4 + d3*FDS_normalized_2**3 + d2*FDS_normalized_2**2 + d1*FDS_normalized_2**1 + d0 densityPW = d5*FDS_normalized**5 + d4*FDS_normalized**4 + d3*FDS_normalized**3 + d2*FDS_normalized**2 + d1*FDS_normalized**1 + d0 density_final = (density1 + density2)/2 uncer_density_final = 0.006 + (np.maximum(density1,density2)-np.minimum(density1,density2))/(2*np.sqrt(3)) #uncertainty = 0.003 if len(FDS)==1: df=pd.DataFrame(data={'Density': density_final, 'Density_PW': densityPW, 'Density_err': uncer_density_final, 'Input_Split':FDS, 'Split_err':uncer_FDS, 'Split_Std':FDS_std, }, index=[0]) else: df=pd.DataFrame(data={'Density': density_final, 'Density_PW': densityPW, 'Density_err': uncer_density_final, 'Input_Split':FDS, 'Split_err':uncer_FDS, 'Split_Std':FDS_std, }) return df
## Shifted polynomial import pickle def blend_weights(x, x_min, x_max): """Cosine smooth blend between x_min and x_max.""" t = np.clip((x - x_min) / (x_max - x_min), 0, 1) return 0.5 * (1 - np.cos(np.pi * t))
[docs] def build_piecewise_poly_by_density(x, y, y_bounds=(0.17, 0.65), degrees=(1, 3, 2), blend_width=0.05, save_path=None): """ Fits and optionally saves a smoothed piecewise polynomial model. Returns: f_base : callable model_data : dict (can be pickled) """ x = np.asarray(x) y = np.asarray(y) mask_low = y < y_bounds[0] mask_mid = (y >= y_bounds[0]) & (y <= y_bounds[1]) mask_high = y > y_bounds[1] polys = [] coeffs = [] for mask, deg in zip([mask_low, mask_mid, mask_high], degrees): c = np.polyfit(x[mask], y[mask], deg) coeffs.append(c) polys.append(np.poly1d(c)) x_low_med = x[np.abs(y - y_bounds[0]).argmin()] x_med_high = x[np.abs(y - y_bounds[1]).argmin()] def f_base(x_input): x_arr = np.asarray(x_input) result = np.full_like(x_arr, np.nan, dtype=float) low_mask = x_arr < (x_low_med - blend_width) mid_mask = (x_arr > (x_low_med + blend_width)) & (x_arr < (x_med_high - blend_width)) high_mask = x_arr > (x_med_high + blend_width) result[low_mask] = polys[0](x_arr[low_mask]) result[mid_mask] = polys[1](x_arr[mid_mask]) result[high_mask] = polys[2](x_arr[high_mask]) blend_lm = (x_arr >= (x_low_med - blend_width)) & (x_arr <= (x_low_med + blend_width)) w_lm = blend_weights(x_arr[blend_lm], x_low_med - blend_width, x_low_med + blend_width) result[blend_lm] = (1 - w_lm) * polys[0](x_arr[blend_lm]) + w_lm * polys[1](x_arr[blend_lm]) blend_mh = (x_arr >= (x_med_high - blend_width)) & (x_arr <= (x_med_high + blend_width)) w_mh = blend_weights(x_arr[blend_mh], x_med_high - blend_width, x_med_high + blend_width) result[blend_mh] = (1 - w_mh) * polys[1](x_arr[blend_mh]) + w_mh * polys[2](x_arr[blend_mh]) return result model_data = { 'coeffs': coeffs, 'y_bounds': y_bounds, 'degrees': degrees, 'blend_width': blend_width, 'x_low_med': x_low_med, 'x_med_high': x_med_high, 'x': x, 'y': y } if save_path: with open(save_path, 'wb') as f: pickle.dump(model_data, f) return f_base, model_data
def blend_weights(x, x_min, x_max): t = np.clip((x - x_min) / (x_max - x_min), 0, 1) return 0.5 * (1 - np.cos(np.pi * t)) def load_piecewise_model(model_data): coeffs = model_data['coeffs'] blend_width = model_data['blend_width'] x_low_med = model_data['x_low_med'] x_med_high = model_data['x_med_high'] polys = [np.poly1d(c) for c in coeffs] vertical_shift = model_data.get('vertical_shift', 0) def f_base(x_input): x_arr = np.asarray(x_input) result = np.full_like(x_arr, np.nan, dtype=float) low_mask = x_arr < (x_low_med - blend_width) mid_mask = (x_arr > (x_low_med + blend_width)) & (x_arr < (x_med_high - blend_width)) high_mask = x_arr > (x_med_high + blend_width) result[low_mask] = polys[0](x_arr[low_mask]) result[mid_mask] = polys[1](x_arr[mid_mask]) result[high_mask] = polys[2](x_arr[high_mask]) blend_lm = (x_arr >= (x_low_med - blend_width)) & (x_arr <= (x_low_med + blend_width)) w_lm = blend_weights(x_arr[blend_lm], x_low_med - blend_width, x_low_med + blend_width) result[blend_lm] = (1 - w_lm) * polys[0](x_arr[blend_lm]) + w_lm * polys[1](x_arr[blend_lm]) blend_mh = (x_arr >= (x_med_high - blend_width)) & (x_arr <= (x_med_high + blend_width)) w_mh = blend_weights(x_arr[blend_mh], x_med_high - blend_width, x_med_high + blend_width) result[blend_mh] = (1 - w_mh) * polys[1](x_arr[blend_mh]) + w_mh * polys[2](x_arr[blend_mh]) return result + vertical_shift return f_base ## New function that is much simpler
[docs] def calculate_density_ucb_new(*, df_combo=None, temp='SupCrit', CI_split=0.67, CI_neon=0.67, Ne_pickle_str=None, pref_Ne=None, Ne_err=None, corrected_split=None, split_err=None, shift=0): """ This function converts Diad Splitting into CO$_2$ density using the UC Berkeley calibration line developed by DeVitre and Wieser in 2023. Parameters ------------- Ne_line_combo: str, '1117_1447', '1117_1400', '1220_1447', '1220_1400', '1220_1567' Combination of Ne lines used for drift correction Either: df_combo: pandas DataFrame data frame of peak fitting information Or: corrected_split: pd.Series Corrected splitting (cm-1) split_err: float, int Error on corrected splitting temp: str 'SupCrit' if measurements done at 37C 'RoomT' if measurements done at 24C - Not supported yet but could be added if needed. CI_neon: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. CI_split: float Default 0.67. Confidence interval to use, e.g. 0.67 returns 1 sigma uncertainties. If you use another number, note the column headings will still say sigma. Either Ne_pickle_str: str Name of Ne correction model OR pref_Ne, Ne_err: float, int For quick and dirty fitting can pass a preferred value for your instrument before you have a chance to regress the Ne lines (useful when first analysing new samples. ) Returns -------------- pd.DataFrame Prefered Density (based on different equatoins being merged), and intermediate calculations """ if corrected_split is not None: Split=corrected_split if df_combo is not None: df_combo_c=df_combo.copy() time=df_combo_c['sec since midnight'] if Ne_pickle_str is not None: # Calculating the upper and lower values for Ne to get that error Ne_corr=calculate_Ne_corr_std_err_values(pickle_str=Ne_pickle_str, new_x=time, CI=CI_neon) # Extracting preferred correction values pref_Ne=Ne_corr['preferred_values'] Split_err, pk_err=propagate_error_split_neon_peakfit(Ne_corr=Ne_corr, df_fits=df_combo_c) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=(Ne_corr['upper_values']*df_combo_c['Splitting']-Ne_corr['lower_values']*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err # If using a single value for quick dirty fitting else: Split_err, pk_err=propagate_error_split_neon_peakfit(df_fits=df_combo_c, Ne_err=Ne_err, pref_Ne=pref_Ne) df_combo_c['Corrected_Splitting_σ']=Split_err df_combo_c['Corrected_Splitting_σ_Ne']=((Ne_err+pref_Ne)*df_combo_c['Splitting']-(Ne_err-pref_Ne)*df_combo_c['Splitting'])/2 df_combo_c['Corrected_Splitting_σ_peak_fit']=pk_err Split=df_combo_c['Splitting']*pref_Ne else: Split_err=split_err # This is for if you just have splitting # This propgates the uncertainty in the splitting from peak fitting, and the Ne correction model if temp=='RoomT': raise TypeError('Sorry, no UC Berkeley calibration at 24C, please enter temp=SupCrit') if isinstance(Split, float) or isinstance(Split, int): Split=pd.Series(Split) DiadFit_dir=Path(__file__).parent # load the new smoothed model with open(DiadFit_dir / "smoothed_polyfit_June25_UCB.pkl", 'rb') as f: smoothed_model_data = pickle.load(f) smoothed_model = load_piecewise_model(smoothed_model_data) # Evaluate model Density = pd.Series(smoothed_model(Split), index=Split.index) # Lets get the error err_df = calculate_Densimeter_std_err_values_smooth( model_data=smoothed_model_data, corrected_split=Split, corrected_split_err=Split_err, CI_dens=0.67, CI_split=0.67, str_d='Smoothed' ) df=pd.DataFrame(data={'Density g/cm3': Density+shift, 'σ Density g/cm3': err_df['Smoothed_Density_σ'], 'σ Density g/cm3 (from Ne+peakfit)': err_df['Smoothed_Density_σ_split'], 'σ Density g/cm3 (from densimeter)': err_df['Smoothed_Density_σ_dens'], 'Corrected_Splitting': Split, 'Preferred D_σ_Ne': 0, 'in range': 'in progress', 'Temperature': temp}) if Ne_pickle_str is not None: df_merge1=pd.concat([df_combo_c, Ne_corr], axis=1).reset_index(drop=True) df_merge=pd.concat([df, df_merge1], axis=1).reset_index(drop=True) elif Ne_pickle_str is None and df_combo is not None: df_merge=pd.concat([df, df_combo_c], axis=1).reset_index(drop=True) else: df_merge=df if Ne_pickle_str is not None: # If its not none, have all the columns for Ne cols_to_move = ['filename', 'Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit', 'power (mW)', 'Spectral Center'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] elif pref_Ne is not None and df_combo is not None: #If Pref Ne, cols_to_move = ['filename', 'Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting', 'Corrected_Splitting_σ', 'Corrected_Splitting_σ_Ne', 'Corrected_Splitting_σ_peak_fit'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] elif df_combo is None: cols_to_move = ['Density g/cm3', 'σ Density g/cm3','σ Density g/cm3 (from Ne+peakfit)', 'σ Density g/cm3 (from densimeter)', 'Corrected_Splitting'] df_merge = df_merge[cols_to_move + [ col for col in df_merge.columns if col not in cols_to_move]] return df_merge
from scipy.stats import t import numpy as np import pandas as pd ##
[docs] def calculate_Densimeter_std_err_values_smooth( *, model_data, corrected_split, corrected_split_err, CI_dens=0.67, CI_split=0.67, str_d='Smoothed', x=None, y=None ): """ Calculates propagated uncertainty for a smoothed polynomial model. Parameters ---------- model_data : dict Dictionary from build_piecewise_poly_by_density including coeffs, blend_width, etc. corrected_split : pd.Series or np.ndarray Corrected splitting values corrected_split_err : float or pd.Series Uncertainty on splitting CI_dens : float Confidence interval for uncertainty in the fit CI_split : float Confidence interval for splitting uncertainty str_d : str Prefix for column names x, y : array-like (optional) Original data used to fit the model, if not included in model_data Returns ------- pd.DataFrame DataFrame of predicted values and propagated uncertainties """ from scipy.stats import t import numpy as np import pandas as pd # === Rebuild model === def load_piecewise_model(model_data): coeffs = model_data['coeffs'] blend_width = model_data['blend_width'] x_low_med = model_data['x_low_med'] x_med_high = model_data['x_med_high'] polys = [np.poly1d(c) for c in coeffs] def f_base(x_input): x_arr = np.asarray(x_input) result = np.full_like(x_arr, np.nan, dtype=float) low_mask = x_arr < (x_low_med - blend_width) mid_mask = (x_arr > (x_low_med + blend_width)) & (x_arr < (x_med_high - blend_width)) high_mask = x_arr > (x_med_high + blend_width) result[low_mask] = polys[0](x_arr[low_mask]) result[mid_mask] = polys[1](x_arr[mid_mask]) result[high_mask] = polys[2](x_arr[high_mask]) blend_lm = (x_arr >= (x_low_med - blend_width)) & (x_arr <= (x_low_med + blend_width)) w_lm = 0.5 * (1 - np.cos(np.pi * (x_arr[blend_lm] - (x_low_med - blend_width)) / (2 * blend_width))) result[blend_lm] = (1 - w_lm) * polys[0](x_arr[blend_lm]) + w_lm * polys[1](x_arr[blend_lm]) blend_mh = (x_arr >= (x_med_high - blend_width)) & (x_arr <= (x_med_high + blend_width)) w_mh = 0.5 * (1 - np.cos(np.pi * (x_arr[blend_mh] - (x_med_high - blend_width)) / (2 * blend_width))) result[blend_mh] = (1 - w_mh) * polys[1](x_arr[blend_mh]) + w_mh * polys[2](x_arr[blend_mh]) return result return f_base Pf = load_piecewise_model(model_data) # Use x/y from model_data if available, else require them as args if 'x' in model_data and 'y' in model_data: x = model_data['x'] y = model_data['y'] elif x is None or y is None: raise ValueError("You must supply x and y arrays if not included in model_data.") residuals = y - Pf(x) residual_std = np.std(residuals) mean_x = np.nanmean(x) n = len(x) N_poly = max(len(c) - 1 for c in model_data['coeffs']) # Standard error on predictions standard_errors = residual_std * np.sqrt(1 + 1/n + (corrected_split - mean_x)**2 / np.sum((x - mean_x)**2)) dfree = n - (N_poly + 1) t_value_dens = t.ppf((1 + CI_dens) / 2, dfree) # Central prediction preferred_values = Pf(corrected_split) lower_values = preferred_values - t_value_dens * standard_errors upper_values = preferred_values + t_value_dens * standard_errors uncertainty_from_dens = (upper_values - lower_values) / 2 # Splitting propagation max_split = corrected_split + corrected_split_err min_split = corrected_split - corrected_split_err max_density = Pf(max_split) min_density = Pf(min_split) uncertainty_split = (max_density - min_density) / 2 total_uncertainty = np.sqrt(uncertainty_split ** 2 + uncertainty_from_dens ** 2) return pd.DataFrame({ f'{str_d}_Density': preferred_values, f'{str_d}_Density_σ': total_uncertainty, f'{str_d}_Density+1σ': preferred_values + total_uncertainty, f'{str_d}_Density-1σ': preferred_values - total_uncertainty, f'{str_d}_Density_σ_dens': uncertainty_from_dens, f'{str_d}_Density_σ_split': uncertainty_split })
from pathlib import Path import pickle def calculate_density_labx( *, df_combo=None, temp='SupCrit', CI_split=0.67, CI_neon=0.67, Ne_pickle_str=None, pref_Ne=None, Ne_err=None, corrected_split=None, split_err=None, model_pickle_path=None ): import pandas as pd import numpy as np from DiadFit.densimeters import calculate_Ne_corr_std_err_values, propagate_error_split_neon_peakfit from DiadFit.densimeters import calculate_Densimeter_std_err_values_smooth, load_piecewise_model if corrected_split is not None: Split = corrected_split if df_combo is not None: df_combo_c = df_combo.copy() time = df_combo_c['sec since midnight'] if Ne_pickle_str is not None: Ne_corr = calculate_Ne_corr_std_err_values(pickle_str=Ne_pickle_str, new_x=time, CI=CI_neon) pref_Ne = Ne_corr['preferred_values'] Split_err, pk_err = propagate_error_split_neon_peakfit(Ne_corr=Ne_corr, df_fits=df_combo_c) df_combo_c['Corrected_Splitting_σ'] = Split_err df_combo_c['Corrected_Splitting_σ_Ne'] = ( (Ne_corr['upper_values'] * df_combo_c['Splitting'] - Ne_corr['lower_values'] * df_combo_c['Splitting']) / 2 ) df_combo_c['Corrected_Splitting_σ_peak_fit'] = pk_err else: Split_err, pk_err = propagate_error_split_neon_peakfit( df_fits=df_combo_c, Ne_err=Ne_err, pref_Ne=pref_Ne ) df_combo_c['Corrected_Splitting_σ'] = Split_err df_combo_c['Corrected_Splitting_σ_Ne'] = ( ((Ne_err + pref_Ne) * df_combo_c['Splitting'] - (Ne_err - pref_Ne) * df_combo_c['Splitting']) / 2 ) df_combo_c['Corrected_Splitting_σ_peak_fit'] = pk_err Split = df_combo_c['Splitting'] * pref_Ne else: Split_err = split_err if temp == 'RoomT': raise TypeError('No calibration available at 24C, please use temp="SupCrit"') if isinstance(Split, (float, int)): import pandas as pd Split = pd.Series(Split) if model_pickle_path is None: raise ValueError("You must provide a path to the LabX model pickle using `model_pickle_path`.") with open(Path(model_pickle_path), 'rb') as f: model_data = pickle.load(f) model = load_piecewise_model(model_data) Density = pd.Series(model(Split), index=Split.index) err_df = calculate_Densimeter_std_err_values_smooth( model_data=model_data, corrected_split=Split, corrected_split_err=Split_err, CI_dens=CI_split, CI_split=CI_split, str_d='LabX' ) df = pd.DataFrame(data={ 'Density g/cm3': Density, 'σ Density g/cm3': err_df['LabX_Density_σ'], 'σ Density g/cm3 (from Ne+peakfit)': err_df['LabX_Density_σ_split'], 'σ Density g/cm3 (from densimeter)': err_df['LabX_Density_σ_dens'], 'Corrected_Splitting': Split, 'Preferred D_σ_Ne': 0, 'in range': 'in progress', 'Temperature': temp }) if Ne_pickle_str is not None: df_merge1 = pd.concat([df_combo_c, Ne_corr], axis=1).reset_index(drop=True) df_merge = pd.concat([df, df_merge1], axis=1).reset_index(drop=True) elif df_combo is not None: df_merge = pd.concat([df, df_combo_c], axis=1).reset_index(drop=True) else: df_merge = df return df_merge ## Way to actually shift densimeter # This general model works for any pickel you load in.
[docs] def apply_and_save_vertical_shift_to_model(*, pickle_in_path, new_x, new_y, pickle_out_path=None): """ Applies a vertical shift to a saved piecewise model based on new_x and new_y, then saves the shifted model to a new .pkl file. Parameters ---------- pickle_in_path : str Path to the original .pkl file (output from build_piecewise_poly_by_density). new_x : array-like Corrected splitting values (x). new_y : array-like Measured density values (y). pickle_out_path : str, optional Where to save the new model. If None, appends '_shifted.pkl' to the input path. Returns ------- shift : float Vertical shift applied to the model. """ import pickle import numpy as np # Load the model with open(pickle_in_path, 'rb') as f: model_data = pickle.load(f) # Rebuild the base function base_model = pf.load_piecewise_model(model_data) f_vals = base_model(new_x) # Calculate vertical shift shift = np.nanmean(new_y - f_vals) # Store the shift model_data['vertical_shift'] = shift # Save new .pkl if pickle_out_path is None: pickle_out_path = pickle_in_path.replace('.pkl', '_shifted.pkl') with open(pickle_out_path, 'wb') as f: pickle.dump(model_data, f) return shift
[docs] def apply_and_save_vertical_shift_to_ucb_densimeter(new_x, new_y): """ Applies a vertical shift to a saved piecewise model based on new_x and new_y, then saves the shifted model to a new .pkl file in the same directory. Parameters ---------- filename : str Name of the original .pkl file (e.g., "smoothed_polyfit_June25_UCB.pkl"). new_x : array-like Corrected splitting values (x). new_y : array-like Measured density values (y). pickle_out_name : str, optional Filename to save the new shifted model. If None, appends '_shifted.pkl' to the input name. Returns ------- shift : float Vertical shift applied to the model. """ DiadFit_dir = Path(__file__).parent with open(DiadFit_dir / "smoothed_polyfit_June25_UCB.pkl", 'rb') as f: model_data = pickle.load(f) base_model = load_piecewise_model(model_data) f_vals = base_model(new_x) shift = np.nanmean(new_y - f_vals) model_data['vertical_shift'] = shift return shift
## Corrected splitting calculator for Argon
[docs] def propagate_error_split_argon_peakfit(*, df_fits, Ar_corr=None, Ar_err=None, pref_Ar=None): """ This function propagates errors in your Ar correction model and peak fits by quadrature. Parameters ----------------- df_fits: pd.DataFrame Dataframe of peak fitting parameters. Must contain columns for 'Diad1_cent_err', 'Diad2_cent_err', 'Splitting' Choose either: Ar_corr: pd.DataFrame (Optional) Dataframe with columns for 'upper_values' and 'lower values', e.g. the upper and lower bounds of the error on the Ar correction model Or pref_Ar and Ar_err: float, int, pd.Series, np.array A preferred value of the Ar correction factor and the error (e.g. pref_Ar=0.998, Ar_err=0.001). Used for rapid peak fitting before developing Ar lines. Returns ----------------- two pd.Series: the error on the splitting, and the combined error from the splitting and the Ar correction model. """ # Get the error on Argon things if isinstance(Ar_corr, pd.DataFrame): Ar_err = (Ar_corr['upper_values'] - Ar_corr['lower_values']) / 2 print(np.mean(Ar_err)) pref_Ar = Ar_corr['preferred_values'] elif pref_Ar is not None and Ar_err is not None: print('using fixed values for Ar error and Ar factor') else: raise TypeError('You must either provide Ar_corr as a dataframe, or provide pref_Ar and Ar_err as values.') # Get the peak fit errors Diad1_err = df_fits['Diad1_cent_err'].fillna(0).infer_objects() Diad2_err = df_fits['Diad2_cent_err'].fillna(0).infer_objects() split_err = (Diad1_err**2 + Diad2_err**2)**0.5 # Propagate uncertainty from Ar correction and peak fits Combo_err = (((df_fits['Splitting'] * Ar_err)**2) + (pref_Ar * split_err)**2)**0.5 return Combo_err, split_err
def calculate_Ar_corr_std_err_values(*, pickle_str, new_x, CI=0.67): # Load the model and the data from the pickle file with open(pickle_str, 'rb') as f: data = pickle.load(f) model = data['model'] N_poly = model.order - 1 Pf = data['model'] x = data['x'] y = data['y'] # Convert new_x to plain numpy array new_x_array = np.asarray(new_x) # Calculate the residuals residuals = y - Pf(x) # Calculate the standard deviation of the residuals residual_std = np.std(residuals) # Calculate the standard errors for the new x values mean_x = np.mean(x) n = len(x) standard_errors = residual_std * np.sqrt(1 + 1/n + (new_x_array - mean_x)**2 / np.sum((x - mean_x)**2)) # Calculate the degrees of freedom df_dof = len(x) - (N_poly + 1) # Calculate the t value for the given confidence level t_value = t.ppf((1 + CI) / 2, df_dof) # Calculate the prediction intervals preferred_values = Pf(new_x_array) lower_values = preferred_values - t_value * standard_errors upper_values = preferred_values + t_value * standard_errors df_out = pd.DataFrame(data={ 'time': new_x_array, 'preferred_values': preferred_values, 'lower_values': lower_values, 'upper_values': upper_values }) return df_out def calculate_corrected_splitting_argon(*, df_combo_c, Ar_pickle_str, CI): time=df_combo_c['sec since midnight'] Ar_corr = calculate_Ar_corr_std_err_values(pickle_str=Ar_pickle_str, new_x=time, CI=CI) Split=df_combo_c['Splitting']*Ar_corr['preferred_values'] df_combo_c['Corrected_Splitting']=Split # Extract preferred correction values pref_Ar = Ar_corr['preferred_values'] Split_err, pk_err = propagate_error_split_argon_peakfit(Ar_corr=Ar_corr, df_fits=df_combo_c) # Add Ar-specific columns to the DataFrame df_combo_c['Corrected_Splitting_σ'] = Split_err df_combo_c['Corrected_Splitting_σ_Ar'] = ( (Ar_corr['upper_values'] * df_combo_c['Splitting'] - Ar_corr['lower_values'] * df_combo_c['Splitting']) / 2 ) df_combo_c['Corrected_Splitting_σ_peak_fit'] = pk_err return df_combo_c