import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import patches
import lmfit
from lmfit.models import GaussianModel, VoigtModel, LinearModel, ConstantModel, PseudoVoigtModel, SkewedVoigtModel
from scipy.signal import find_peaks
import os
import re
from os import listdir
from os.path import isfile, join
from DiadFit.importing_data_files import *
from typing import Tuple, Optional
from dataclasses import dataclass
import matplotlib.patches as patches
from tqdm.notebook import tqdm
import re
import scipy.stats as stats
import pickle
allowed_models = ["VoigtModel", "PseudoVoigtModel", "Pearson4Model", "SkewedVoigtModel", "GaussianModel"]
encode="ISO-8859-1"
## Plotting Ne lines, returns peak position
[docs]
def find_closest(df, line1_shift):
"""
This function finds the closest Raman shift value in the inputted dataframe to the inputted line position
Parameters
-------------
df: pd.DataFrame
Dataframe of Ne line positions based on laser wavelength from the function calculate_Ne_line_positions, or calculate_Ar_line_positions
line1_shift: int, float
input line position
error_pk2
Returns
-------------
Closest theoretical line position
"""
dist = (df['Raman_shift (cm-1)'] - line1_shift).abs()
return df.loc[dist.idxmin()]
[docs]
def find_max_row(df, target_shift, tol=2):
""" This function is used to find the highest amplitude within a predefined ampl range for finding the right Ne line
"""
df_filtered = df[(df['Raman_shift (cm-1)'] >= target_shift - tol) & (df['Raman_shift (cm-1)'] <= target_shift + tol)]
# Find the row with the maximum intensity within this filtered DataFrame
max_intensity_row = df_filtered.loc[df_filtered['Intensity'].idxmax()]
return max_intensity_row
[docs]
def calculate_Ne_splitting(wavelength=532.05, line1_shift=1117, line2_shift=1447, cut_off_intensity=2000):
"""
Calculates ideal splitting in air between lines closest to user-specified line shift. E.g. if user enters
1117 and 1447 line, it looks for the nearest theoretical Ne line position based on your wavelength,
and calculates the ideal splitting between these theoretical line positions. This is used to calculate the
ideal Ne line splitting for doing the Ne correction routine of Lamadrid et al. (2017)
Parameters
-------------
Wavelength: int
Wavelength of your laser
line1_shift: int, float
Estimate of position of line 1
line2_shift: int, float
Estimate of position of line 2
cut_off_intensity: int, float
only searches through lines with a theoretical intensity from NIST grater than this value
Returns
----------
df of theoretical splitting, line positions used for this, and entered Ne line positions
"""
df_Ne=calculate_Ne_line_positions(wavelength=wavelength, cut_off_intensity=cut_off_intensity)
#
# closest1=find_closest(df_Ne, line1_shift).loc['Raman_shift (cm-1)']
# closest2=find_closest(df_Ne, line2_shift).loc['Raman_shift (cm-1)']
# closest_1_int=find_closest(df_Ne, line1_shift).loc['Intensity']
# closest_2_int=find_closest(df_Ne, line2_shift).loc['Intensity']
#
# diff=abs(closest1-closest2)
#
# df=pd.DataFrame(data={'Ne_Split': diff,
# 'Line_1': closest1,
# 'Line_2': closest2,
# 'Line_1_int': closest_1_int,
# 'Line_2_int': closest_2_int,
# 'Entered Pos Line 1': line1_shift,
# 'Entered Pos Line 2': line2_shift}, index=[0])
#
#
# return df
# Use the new function to find the lines of interest
closest1_row = find_max_row(df_Ne, line1_shift)
closest2_row = find_max_row(df_Ne, line2_shift)
# Extract the required values from the rows
closest1 = closest1_row['Raman_shift (cm-1)']
closest2 = closest2_row['Raman_shift (cm-1)']
closest_1_int = closest1_row['Intensity']
closest_2_int = closest2_row['Intensity']
# Calculate the difference
diff = abs(closest1 - closest2)
# Create the DataFrame
df = pd.DataFrame(data={
'Ne_Split': diff,
'Line_1': closest1,
'Line_2': closest2,
'Line_1_int': closest_1_int,
'Line_2_int': closest_2_int,
'Entered Pos Line 1': line1_shift,
'Entered Pos Line 2': line2_shift
}, index=[0])
return df
[docs]
def calculate_Ne_line_positions(wavelength=532.05, cut_off_intensity=2000):
"""
Calculates Raman shift for a given laser wavelength of Ne lines, using the datatable from NIST of the observed Ne line
emissoin in air and the intensity of each line.
Data from https://physics.nist.gov/PhysRefData/ASD/lines_form.html
Ticked
Parameters
---------------
Wavelength: float
Wavelength of laser
cut_off_intensity: float
Only chooses lines with intensities greater than this
Returns
------------
pd.DataFrame
df wih Raman shift, intensity, and emission line position in air.
"""
Ne_emission_line_air=np.array([
541.85584,
542.009,
542.0155,
543.36513,
544.7120,
544.85091,
549.44158,
550.73442,
551.1176,
551.1485,
552.063,
553.36788,
553.86510,
555.90978,
556.244160,
556.276620,
556.305310,
557.603940,
558.590500,
558.934720,
559.115000,
565.256640,
565.602580,
565.665880,
566.220000,
566.254890,
568.464700,
568.981630,
571.534090,
571.887980,
571.922480,
571.953000,
574.243700,
574.829850,
574.864460,
576.058850,
576.405250,
576.441880,
577.030670,
580.409000,
580.444960,
581.140660,
581.662190,
582.015580,
582.890630,
585.24878,
586.84165,
587.2145,
587.28275,
588.1895,
589.83287,
590.20944,
590.24623,
590.27835,
590.64294,
591.3633,
591.89068,
591.9029,
593.44522,
593.93154,
594.4834,
596.16228,
596.5471,
596.6179,
597.46273,
597.55343,
598.23753,
598.79074,
599.16477,
600.09275,
602.99968,
604.2013,
604.61348,
606.45359,
607.43376,
609.6163,
611.088,
611.272,
611.80187,
612.84498,
613.277,
614.2508,
614.30627,
615.02985,
615.6138,
616.35937,
616.609,
617.28156,
617.48829,
617.52842,
618.2146,
618.31575,
618.90649,
619.30663,
620.2974,
620.57775,
620.908,
621.018,
621.38758,
621.72812,
622.448,
622.5735,
623.9032,
624.67294,
624.9593,
625.0925,
625.2732,
625.2905,
625.869,
625.87884,
626.64952,
627.0131,
627.242,
627.30141,
627.60327,
627.75,
628.358,
628.959,
629.37447,
629.784,
630.47893,
630.6113,
630.766,
631.36855,
632.1116,
632.81646,
633.08894,
633.44276,
634.5653,
635.18532,
636.49963,
638.29914,
640.1076,
640.2248,
640.658,
640.97469,
642.17044,
644.47118,
650.65277,
651.0877,
651.1789,
651.6789,
651.7041,
652.5584,
652.603,
653.28824,
653.8685,
653.9143,
653.9492,
653.9692,
655.0653,
655.0653,
655.9024,
655.9285,
657.13,
657.17,
659.89528,
660.29007,
664.00095,
664.08,
665.20925,
666.6892,
667.82766,
671.7043,
672.11342,
673.8032,
675.95821,
688.694,
689.541,
692.94672,
702.405,
703.24128,
705.12922,
705.91079,
706.4762,
706.7724,
711.23075,
713.336,
713.854,
717.3938,
721.321,
723.51978,
724.51665
])
Intensity=np.array([
1500,
12,
500,
2500,
80,
1500,
500,
250,
30,
150,
30,
750,
500,
350,
1500.00,
5000.00,
750.00,
350.00,
50.00,
500.00,
80.00,
750.00,
750.00,
5000.00,
40.00,
750.00,
250.00,
1500.00,
350.00,
1500.00,
5000.00,
750.00,
80.00,
5000.00,
700.00,
700.00,
30.00,
7000.00,
500.00,
750.00,
5000.00,
3000.00,
500.00,
5000.00,
750.00,
20000,
750,
750,
5000,
10000,
200,
30,
500,
50,
500,
2500,
2500,
80,
750,
500,
5000,
700,
5000,
350,
5000,
6000,
80,
1500,
750,
1000,
10000,
150,
500,
500,
10000,
3000,
100,
160,
150,
1000,
100,
1000,
10000,
1000,
500,
10000,
80,
150,
700,
500,
1500,
50,
700,
500,
150,
1000,
70,
70,
1500,
10000,
160,
500,
120,
1000,
50,
120,
20,
80,
180,
1000,
10000,
80,
140,
700,
500,
30,
30,
30,
1000,
80,
1000,
100,
180,
1500,
60,
3000,
1500,
10000,
60,
1000,
1000,
10000,
1000,
20000,
90,
1500,
1000,
1500,
15000,
60,
140,
120,
100,
22,
120,
1000,
80,
120,
140,
60,
140,
140,
100,
100,
22,
60,
10000,
1000,
100,
50,
1500,
1000,
5000,
700,
20,
700,
150,
70,
100,
100000,
34000,
85000,
2200,
10000,
80,
80,
110,
40,
55,
77000,
300,
300,
77000
])
Raman_shift=(10**7/wavelength)-(10**7/Ne_emission_line_air)
df_Ne=pd.DataFrame(data={'Raman_shift (cm-1)': Raman_shift,
'Intensity': Intensity,
'Ne emission line in air': Ne_emission_line_air})
df_Ne_r=df_Ne.loc[df_Ne['Intensity']>cut_off_intensity]
# Lets also only return positive numbers
df_Ne_p=df_Ne_r.loc[df_Ne_r['Raman_shift (cm-1)']>0]
return df_Ne_p
[docs]
@dataclass
class Neon_id_config:
# Exclude a range, e.g. cosmic rays
exclude_range_1: Optional [Tuple[float, float]] = None
exclude_range_2: Optional [Tuple[float, float]] = None
# Things for Scipy find peaks
height: tuple = 10
distance: float= 1
prominence: float = 10
width: float = 1
threshold: float = 0.6
peak1_cent: float= 1117
peak2_cent: float=1447
# Number of peaks to look for
n_peaks: float = 6
[docs]
def identify_Ne_lines(*, config: Neon_id_config=Neon_id_config(), path=None, filename=None, filetype=None, plot_figure=True, print_df=False,
Ne_array=None):
"""
Loads Ne line, uses scipy find peaks to identify peaks, overlays these,
and returns approximate peak positions, prominences etc to feed into fitting algorithms
Parameters
-----------
config: from Neon_id_config
This is used to identify peaks using Scipy find peaks. Parameters that can be tweaked
exclude_range_1: None, or Tuple[float, float]
Range to exclude (e.g, cosmic ray, instrument noise)
exclude_range_2: None, or Tuple[float, float]
Range to exclude (e.g, cosmic ray, instrument noise)
height, distance, prominence, width, threshold: float
Scipy find peak parameters you can tweak
peak1_cent: float
Estimate of location of Ne line 1
peak2_cent: float
Estimate of location of Ne line 2
n_peaks: float
Looks through the largest N peaks of scipy in the entire spectra and identifies them on the plot
path: str
Folder user wishes to read data from
filename: str
Specific file being read
filetype: str
choose from 'Witec_ASCII', 'headless_txt', 'headless_csv', 'head_csv', 'Witec_ASCII',
'HORIBA_txt', 'Renishaw_txt'
plot_figure: bool
If True, plots a figure highlighting the identified peaks
print_df: bool
if True, prints the positions of the N biggest peaks it found
Ne_array: np.array
Can also enter data as a numpy array, rather than as a filename, filepath and filetype
Returns
--------------
Ne, df_fit_params
Ne: np.array of spectral data (with ranges excluded)
df_fit_params: DataFrame of approximate peak positions, prominences etc.
"""
# This bit extracts the data, unless you already fed it in as an array
if filename is not None and path is not None and filetype is not None:
Ne_in=get_data(path=path, filename=filename, filetype=filetype)
if Ne_array is not None:
Ne_in=Ne_array
# This gets parameters from config file
exclude_range_1=config.exclude_range_1
exclude_range_2=config.exclude_range_2
height=config.height
distance=config.distance
prominence=config.prominence
width=config.width
threshold=config.threshold
peak1_cent=config.peak1_cent
peak2_cent=config.peak2_cent
n_peaks=config.n_peaks
# This bit filters the spectra if you want to exclude a range or 2
if exclude_range_1 is None and exclude_range_2 is None:
Ne=Ne_in
if exclude_range_1 is not None:
Ne=Ne_in[ (Ne_in[:, 0]<exclude_range_1[0]) | (Ne_in[:, 0]>exclude_range_1[1]) ]
if exclude_range_2 is not None:
Ne=Ne_in[ (Ne_in[:, 0]<exclude_range_2[0]) | (Ne_in[:, 0]>exclude_range_2[1]) ]
# Get X and Y coords.
y=Ne[:, 1]
x=Ne[:, 0]
spec_res=np.abs(x[1]-x[0])
# Apply Scipy Find peaks using the parameters in the config file.
peaks = find_peaks(y,height = height, threshold = threshold, distance = distance, prominence=prominence, width=width)
# This gets a list of peak heights
height = peaks[1]['peak_heights']
# This gets a list of peak positions
peak_pos = x[peaks[0]]
# Lets combine them in a dataframe
df=pd.DataFrame(data={'pos': peak_pos,
'height': height})
# Find bigest peaks, and take up to n peaks
df_sort_Ne=df.sort_values('height', axis=0, ascending=False)
df_sort_Ne_trim=df_sort_Ne[0:n_peaks]
if print_df is True:
print('Biggest N peaks:')
print(df_sort_Ne_trim)
# Get peak within +-5
df_pk1=df_sort_Ne.loc[df['pos'].between(peak1_cent-10*spec_res, peak1_cent+10*spec_res)]
df_pk2=df_sort_Ne.loc[df['pos'].between(peak2_cent-10*spec_res, peak2_cent+10*spec_res)]
df_pk1_trim=df_pk1[0:1]
df_pk2_trim=df_pk2[0:1]
# Lets extract spectra 10 spectral units either side of the asked for peak positions
Neon1_region=(x<(peak1_cent+10*spec_res)) & (x>(peak1_cent-10*spec_res))
Neon1_trim_y=y[Neon1_region]
Neon1_trim_x=x[Neon1_region]
Neon2_region=(x<(peak1_cent+10*spec_res)) & (x>(peak1_cent-10*spec_res))
Neon2_trim_y=y[Neon1_region]
Neon2_trim_x=x[Neon1_region]
# Take 25th quantile as representative of the background position
Baseline_Neon1=np.quantile(Neon1_trim_y, 0.1)
Baseline_Neon2=np.quantile(Neon2_trim_y, 0.1)
if len(df_pk1_trim)==1:
df_fit_params=pd.DataFrame(data={'Peak1_cent': df_pk1_trim['pos'],
'Peak1_height': df_pk1_trim['height']})
elif len(df_pk1_trim)>1:
df_pk1_max=df_pk1_trim.loc[df_pk1_trim['height']==np.max(df_pk1_trim['height'])]
df_fit_params=pd.DataFrame(data={'Peak1_cent': df_pk1_max['pos'],
'Peak1_height': df_pk1_max['height']})
elif len(df_pk1_trim)==0:
Max_y_Neon1=np.max(Neon1_trim_y)
x_Neon_1=x[y==Max_y_Neon1][0]
df_fit_params=pd.DataFrame(data={'Peak1_cent': x_Neon_1,
'Peak1_height': Max_y_Neon1}, index=[0])
if len(df_pk2_trim)==1:
df_fit_params['Peak2_cent']=df_pk2_trim['pos'].iloc[0]
df_fit_params['Peak2_height']=df_pk2_trim['height'].iloc[0]
elif len(df_pk2_trim)>1:
df_pk2_max=df_pk2_trim.loc[df_pk2_trim['height']==np.max(df_pk2['height'])]
df_fit_params['Peak2_cent']=df_pk2_max['pos'].iloc[0]
df_fit_params['Peak2_height']=df_pk2_max['height'].iloc[0]
elif len(df_pk2_trim)==0:
Max_y_Neon2=np.max(Neon2_trim_y)
x_Neon_2=x[y==Max_y_Neon2][0]
df_fit_params['Peak2_cent']= x_Neon_2
df_fit_params['Peak2_height']=Max_y_Neon2
if plot_figure is True:
fig, (ax0, ax1, ax2) = plt.subplots(1, 3, figsize=(11, 3.5))
ax0.plot(x, y, '-r')
miny=np.min(y)
maxy=np.max(y)
ax0.set_ylabel('Amplitude (counts)')
ax0.set_xlabel('Wavenumber (cm$^{-1}$)')
ax1.plot(Ne_in[:, 0], Ne_in[:, 1], '-c')
ax1.plot(Ne_in[:, 0], Ne_in[:, 1], '.c')
ax1.plot(Ne[:, 0], Ne[:, 1], '.r')
ax1.plot(x, y, '-r', label='filtered')
ax1.plot(df_sort_Ne_trim['pos'], df_sort_Ne_trim['height'], '*c', label='all pks IDed')
ax1.plot([peak1_cent-20, peak1_cent+20], [Baseline_Neon1, Baseline_Neon1], '-g', label='approx bck' )
#ax1.plot(peak2_cent, df_pk2_trim['height'], '*k')
pos_pk1=str(np.round(df_fit_params['Peak1_cent'].iloc[0], 2))
nearest_pk1=peak1_cent
#ax1.annotate('peak=' + pos_pk1, xy=(df_fit_params['Peak1_cent'].iloc[0]+3,
#Baseline_Neon1*1.5+700), xycoords="data", fontsize=10, rotation=90)
ax1.annotate('peak=' + pos_pk1, xy=(0.8, 0.3),xycoords="axes fraction", fontsize=10, rotation=90)
ax1.plot(df_fit_params['Peak1_cent'], df_fit_params['Peak1_height'], '*k', mfc='yellow', ms=8, label='selected peak')
ax1.legend(loc='upper center', ncol=2, fontsize=8)
pos_pk2=str(np.round(df_fit_params['Peak2_cent'].iloc[0], 2))
ax2.plot([peak2_cent-20, peak2_cent+20], [Baseline_Neon2, Baseline_Neon2], '-g')
# ax2.annotate('peak=' + pos_pk2, xy=(df_fit_params['Peak2_cent'].iloc[0]-5,
# Baseline_Neon1*2+700), xycoords="data", fontsize=10, rotation=90)
ax2.annotate('peak=' + pos_pk2,xy=(0.8, 0.3),xycoords="axes fraction", fontsize=10, rotation=90)
ax1.set_xlim([peak1_cent-15, peak1_cent+15])
ax1.set_xlim([peak1_cent-10, peak1_cent+10])
ax2.plot(Ne[:, 0], Ne[:, 1], '.r', label='input')
ax2.plot(x, y, '-r')
ax2.plot(df_sort_Ne_trim['pos'], df_sort_Ne_trim['height'], '*k', mfc='yellow', ms=8)
#print(df_pk1)
ax2.set_xlim([peak2_cent-15, peak2_cent+15])
ax1.set_xlabel('Wavenumber (cm$^{-1}$)')
ax2.set_xlabel('Wavenumber (cm$^{-1}$)')
ax1.set_ylim([0, 1.5*df_fit_params['Peak1_height'].iloc[0]+100])
ax2.set_ylim([0, 1.5*df_fit_params['Peak2_height'].iloc[0]+100])
fig.tight_layout()
df_fit_params['Peak1_prom']=df_fit_params['Peak1_height']-Baseline_Neon1
df_fit_params['Peak2_prom']=df_fit_params['Peak2_height']-Baseline_Neon2
df_fit_params=df_fit_params.reset_index(drop=True)
return Ne, df_fit_params
## Ne baselines
[docs]
def remove_Ne_baseline_pk(Ne, N_poly_pk1_baseline=None, Ne_center_1=None,
lower_bck=None, upper_bck1=None, upper_bck2=None, sigma_baseline=None):
""" This function uses a defined range of values to fit a baseline of Nth degree polynomial to the baseline
around a specified peak
Parameters
-----------
Ne: np.array
np.array of x and y coordinates from the spectra
N_poly_pk1_baseline: int
Degree of polynomial used to fit the background
Ne_center_1: float
Center position for Ne line being fitted
lower_bck: list (length 2). default [-50, -20]
position used for lower background relative to peak, so =[-50, -20] takes a
background -50 and -20 from the peak center
upper_bck1: list (length 2). default [8, 15]
position used for 1st upper background relative to peak, so =[8, 15] takes a
background +8 and +15 from the peak center
upper_bck2: list (length 2). default [30, 50]
position used for 2nd upper background relative to peak, so =[30, 50] takes a
background +30 and +50 from the peak center
Returns
-----------
y_corr, Py_base, x, Ne_short, Py_base, Baseline_y, Baseline_x
y_corr (numpy.ndarray): The corrected y-values after subtracting the fitted polynomial baseline from the original data.
Py_base (numpy.ndarray): The y-values of the fitted polynomial baseline.
x (numpy.ndarray): The x-values of the trimmed data within the specified range.
Ne_short (numpy.ndarray): The trimmed data within the specified range.
Baseline_y (numpy.ndarray): The y-values of the baseline data points.
Baseline_x (numpy.ndarray): The x-values of the baseline data points
"""
lower_0baseline_pk1=Ne_center_1+lower_bck[0]
upper_0baseline_pk1=Ne_center_1+lower_bck[1]
lower_1baseline_pk1=Ne_center_1+upper_bck1[0]
upper_1baseline_pk1=Ne_center_1+upper_bck1[1]
lower_2baseline_pk1=Ne_center_1+upper_bck2[0]
upper_2baseline_pk1=Ne_center_1+upper_bck2[1]
# Trim for entire range
Ne_short=Ne[ (Ne[:,0]>lower_0baseline_pk1) & (Ne[:,0]<upper_2baseline_pk1) ]
# Get actual baseline
Baseline_with_outl=Ne_short[
((Ne_short[:, 0]<=upper_0baseline_pk1) &(Ne_short[:, 0]>=lower_0baseline_pk1))
|
((Ne_short[:, 0]<=upper_1baseline_pk1) &(Ne_short[:, 0]>=lower_1baseline_pk1))
|
((Ne_short[:, 0]<=upper_2baseline_pk1) &(Ne_short[:, 0]>=lower_2baseline_pk1))]
# Calculates the median for the baseline and the standard deviation
Median_Baseline=np.median(Baseline_with_outl[:, 1])
Std_Baseline=np.std(Baseline_with_outl[:, 1])
# Removes any points in the baseline outside of 2 sigma (helps remove cosmic rays etc).
Baseline=Baseline_with_outl[(Baseline_with_outl[:, 1]<Median_Baseline+sigma_baseline*Std_Baseline)
&
(Baseline_with_outl[:, 1]>Median_Baseline-sigma_baseline*Std_Baseline)
]
# Fits a polynomial to the baseline of degree
Pf_baseline = np.poly1d(np.polyfit(Baseline[:, 0], Baseline[:, 1], N_poly_pk1_baseline))
Py_base =Pf_baseline(Ne_short[:, 0])
Baseline_ysub=Pf_baseline(Baseline_with_outl[:, 0])
Baseline_y=Baseline[:, 1]
Baseline_x= Baseline[:, 0]#Baseline[:, 0]
y_corr= Ne_short[:, 1]- Py_base
x=Ne_short[:, 0]
return y_corr, Py_base, x, Ne_short, Py_base, Baseline_y, Baseline_x
[docs]
def fit_Ne_pk(x, y_corr, x_span=[-10, 8], Ne_center=1117.1, amplitude=98.1, pk1_sigma=0.28,
LH_offset_mini=[1.5, 3], peaks_pk1=2, model_name='PseudoVoigtModel', block_print=True,
const_params=True, spec_res=0.4, py_baseline=None, minimise='least_squares' ) :
""" This function fits the 1117 Ne line as 1 or two voigt peaks
Parameters
-----------
x: np.array
x coordinate (wavenumber)
y: np.array
Background corrected intensiy
x_span: list length 2. Default [-10, 8]
Span either side of peak center used for fitting,
e.g. by default, fits to 10 wavenumbers below peak, 8 above.
Ne_center: float (default=1117.1)
Center position for Ne line being fitted
amplitude: integer (default = 98)
peak amplitude
sigma: float (default =0.28)
sigma of the voigt peak
peaks_pk1: integer
number of peaks to fit, e.g. 1, single voigt, 2 to get shoulder peak
LH_offset_mini: list length 2
Forces second peak to be within -1.5 to -3 from the main peak position.
print_report: bool
if True, prints fit report.
py_baseline:
Output from the remove_ne_baseline function - needed for weighted least squares
minimise: str
'weighted_least_squares' or 'least_squares'
"""
if const_params is True:
min_off=0.8
max_off=1.2
if const_params is False:
min_off=0.3
max_off=3
# Flatten x and y if needed
xdat=x.flatten()
ydat=y_corr.flatten()
df=pd.DataFrame(data={'Xdata': xdat,
'Ydata': ydat})
#df.to_clipboard(excel=True)
# This defines the range you want to fit (e.g. how big the tails are)
lower_pk1=Ne_center+x_span[0]
upper_pk1=Ne_center+x_span[1]
# This segments into the x and y variable, and variables to plot, which are a bit bigger.
Ne_pk1_reg_x=x[(x>lower_pk1)&(x<upper_pk1)]
Ne_pk1_reg_y=y_corr[(x>lower_pk1)&(x<upper_pk1)]
Ne_pk1_reg_x_plot=x[(x>(lower_pk1-3))&(x<(upper_pk1+3))]
Ne_pk1_reg_y_plot=y_corr[(x>(lower_pk1-3))&(x<(upper_pk1+3))]
# Need to do the same for baseline
py_baseline2=py_baseline[(x>lower_pk1)&(x<upper_pk1)]
if peaks_pk1>1:
# Setting up lmfit
model0 = globals()[model_name](prefix='p0_')
pars0 = model0.make_params()
pars0['p0_center'].set(Ne_center, min=Ne_center-2*spec_res, max=Ne_center+2*spec_res)
pars0['p0_amplitude'].set(amplitude, min=amplitude*min_off, max=amplitude*max_off)
init0 = model0.eval(pars0, x=xdat)
result0 = model0.fit(ydat, pars0, x=xdat)
Center_p0=result0.best_values.get('p0_center')
if block_print is False:
print('first iteration, peak Center='+str(np.round(Center_p0, 4)))
Amp_p0=result0.params.get('p0_amplitude')
if block_print is False:
print('first iteration, peak Amplitude='+str(np.round(Amp_p0, 4)))
fwhm_p0=result0.params.get('p0_fwhm')
model1 = globals()[model_name](prefix='p1_')
pars1 = model1.make_params()
pars1['p1_'+ 'amplitude'].set(Amp_p0, min=min_off*Amp_p0, max=max_off*Amp_p0)
pars1['p1_'+ 'center'].set(Center_p0, min=Center_p0-spec_res/2, max=Center_p0+spec_res/2)
pars1['p1_'+ 'sigma'].set(pk1_sigma, min=pk1_sigma*min_off, max=pk1_sigma*max_off)
# Second wee peak
peak = globals()[model_name](prefix='p2_')
pars = peak.make_params()
minp2=Center_p0-LH_offset_mini[1]
maxp2=Center_p0-LH_offset_mini[0]
if block_print is False:
print('Trying to place second peak between '+str(np.round(minp2, 2))+'and'+ str(np.round(maxp2, 2)))
#pars['p2_fwhm'].set(fwhm_p0/2, min=0.001, max=fwhm_p0*5)
pars['p2_center'].set((maxp2+minp2)/2, min=minp2,
max=maxp2)
pars['p2_amplitude'].set(Amp_p0/10, min=Amp_p0/15, max=Amp_p0/4)
pars['p2_sigma'].set(pk1_sigma/1.2, min=pk1_sigma/3, max=pk1_sigma)
model_combo=model1+peak
# updating pars1, the fit to peak 1, with pars values
pars1.update(pars)
# Attempt at stabilizing peak fit
result = model_combo.fit(ydat, pars1, x=xdat)
# pars1['p2_center'].vary = False
# pars1['p2_amplitude'].vary = False
# pars1['p2_sigma'].vary = False
#
result.params['p2_center'].vary = False
result.params['p2_amplitude'].vary = False
result.params['p2_sigma'].vary = False
model1_only = model1
pars1.update(result.params)
#result_pk1 = model1_only.fit(ydat, pars1, x=xdat)
result = model_combo.fit(ydat, pars1, x=xdat)
if peaks_pk1==1:
model_combo = globals()[model_name](prefix='p1_')
# create parameters with initial values
pars1 = model_combo.make_params()
pars1['p1_amplitude'].set(amplitude, min=amplitude*min_off, max=amplitude*max_off)
pars1['p1_' + 'center'].set(Ne_center, min=Ne_center-2*spec_res,max=Ne_center+2*spec_res)
pars1['p1_'+ 'sigma'].set(pk1_sigma, min=pk1_sigma*min_off, max=pk1_sigma*max_off)
if minimise=='least_squares':
result = model_combo.fit(ydat, pars1, x=xdat)
init = model_combo.eval(pars1, x=xdat)
elif minimise=='weighted_least_squares':
result0 = model_combo.fit(ydat, pars1, x=xdat)
f_model = result0.best_fit
# 2. Fityk-style weighting:
# We assume the variance is proportional to the observed intensity.
# We don't multiply/divide by 5 here, so we don't 'inflate' the errors
# beyond what Fityk's standard least-squares would expect.
total_counts_obs = f_model + py_baseline
# 3. Calculate weights (1/sigma)
# This keeps the RELATIVE importance of the peaks vs baseline correct.
weights = 1.0 / np.sqrt(np.maximum(total_counts_obs, 1.0))
# 4. Final fit
# We use scale_covar=True to let lmfit scale the errors based on the
# fit quality (the residuals), recomended if we dont know the absolute size of the errors/weights and we dont because there could easily be gain on the detector.
result = model_combo.fit(ydat, result0.params, x=xdat,
weights=weights,
scale_covar=True)
init = model_combo.eval(pars1, x=xdat)
else:
raise ValueError(
f"Invalid minimise option: '{minimise}'. "
"Please specify minimise='least_squares' or "
"minimise='weighted_least_squares'."
)
# Need to check errors output
Error_bars=result.errorbars
# Get center value
Center_p1=result.best_values.get('p1_center')
error_pk1 = result.params['p1_center'].stderr
error_pk1_amp = result.params['p1_amplitude'].stderr
fwhm_pk1 = result.params['p1_fwhm'].value
# Get mix of lorenz
Peak1_Prop_Lor=result.best_values.get('p1_fraction')
Area_pk1=result.best_values.get('p1_amplitude')
sigma_pk1=result.best_values.get('p1_sigma')
gamma_pk1=result.best_values.get('p1_gamma')
# Evaluate the peak at 100 values for pretty plotting
xx_pk1=np.linspace(lower_pk1, upper_pk1, 2000)
result_pk1=result.eval(x=xx_pk1)
comps=result.eval_components(x=xx_pk1)
result_pk1_origx=result.eval(x=Ne_pk1_reg_x)
Center_pk1=Center_p1
return Center_pk1, Area_pk1, sigma_pk1, gamma_pk1, Ne_pk1_reg_x_plot, Ne_pk1_reg_y_plot, Ne_pk1_reg_x, Ne_pk1_reg_y, xx_pk1, result_pk1, error_pk1, result_pk1_origx, comps, Peak1_Prop_Lor, error_pk1_amp, fwhm_pk1
## Setting default Ne fitting parameters
[docs]
@dataclass
class Ne_peak_config:
model_name: str = 'PseudoVoigtModel'
# Things for the background positioning and fit
N_poly_pk1_baseline: float = 1 #Degree of polynomial to fit to the baseline
N_poly_pk2_baseline: float = 1 #Degree of polynomial to fit to the baseline
sigma_baseline=3 # Discard things outside of this sigma on the baseline
lower_bck_pk1: Tuple[float, float] = (-50, -25) # Background position Pk1
upper_bck1_pk1: Tuple[float, float] = (8, 15) # Background position Pk1
upper_bck2_pk1: Tuple[float, float] = (30, 50) # Background position Pk1
lower_bck_pk2: Tuple[float, float] = (-44.2, -22) # Background position Pk2
upper_bck1_pk2: Tuple[float, float] = (15, 50) # Background position Pk2
upper_bck2_pk2: Tuple[float, float] = (50, 51) # Background position Pk1
# Whether you want a secondary peak
peaks_1: float=2
peaks_2: float=1
# SPlitting
DeltaNe_ideal: float= 330.477634
# Things for plotting the baseline
x_range_baseline_pk1: float=20 # How many units outside your selected background it shows on the baseline plot
y_range_baseline_pk1: float= 200 # Where the y axis is cut off above the minimum baseline measurement
x_range_baseline_pk2: float=20 # How many units outside your selected background it shows on the baseline plot
y_range_baseline_pk2: float= 200 # Where the y axis is cut off above the minimum baseline measurement
# Sigma for peaks
pk1_sigma: float = 0.4
pk2_sigma: float = 0.4
# Things for plotting the primary peak
x_range_peak: float=15 # How many units to each side are shown when plotting the peak fits
# Things for plotting the residual
x_range_residual: float=7 # Shows how many x units to left and right is shown on residual plot
# Things for fitting a secondary peak on pk1
LH_offset_mini: Tuple[float, float] = (1.5, 3)
# Same for Pk2
LH_offset_mini2: Tuple[float, float] = None
# Optional, by default, fits to the points inside the baseline. Can also specify as values to make a smaller peak fit.
x_span_pk1: Optional [Tuple[float, float]] = None # Tuple[float, float] = (-10, 8)
x_span_pk2: Optional [Tuple[float, float]] = None # Tuple[float, float] = (-5, 5)
[docs]
def fit_Ne_lines(*, config: Ne_peak_config=Ne_peak_config(),
Ne_center_1=1117.1, Ne_center_2=1147, Ne_prom_1=100, Ne_prom_2=200,
Ne=None, filename=None, path=None, prefix=True,
plot_figure=True, loop=True,
save_clipboard=False,
close_figure=False, const_params=True, minimise='least_squares'):
""" This function reads in a user file, fits the Ne lines, and if required, saves an image
into a new sub folder
Parameters
-----------
Ne: np.array
x coordinate (wavenumber) and y coordinate (intensity)
filename and path: str
used to save filename in datatable, and to make a new folder.
prefix: bool
Whether or not the filename has a prefix
Ne_center_1, Ne_center_2 : float
Approximate peak position of the 1st and 2nd Ne line
Ne_prom_1, Ne_prom_2 : float
Approximate prominance of the 1st and 2nd Ne line
plot_figure: bool
Plots a figure, nice to inspect fits, makes it slower
loop: bool
save_clipboard: bool
Saves results to clipboard if true
close_figure: bool
Closes figure if True (useful in some editors)
const_params: bool
If true, means amplitude and peak sigma have to be closer to the guessed parameters (e.g. used after initial fit).
E.g. forced within +-20% of estimated peak parameters if True, if false, +-100%.
minimise:str
'weighted_least_squares' or 'least_squares'
config parameters from Ne_peak_config():
model_name: str
allowed_models = "VoigtModel", "PseudoVoigtModel", "Pearson4Model", "SkewedVoigtModel"
N_poly_pk1_baseline, N_poly_pk1_baseline: int (default 1)
Degree of polynomial to fit to baseline around pk
lower_bck_pk1, upper_bck1_pk1, upper_bck2_pk1: tuple
lower_bck_pk2, upper_bck1_pk2, upper_bck2_pk2: tuple
Background positions relative to estimated peak center.
peaks_1, peaks_2: int
Number of peaks to fit to peak 1. If you need 2 overlapping peaks, put this Ne line as pk1
if not 1, you also need LH_offset_mini=(1.5, 3). Means second peak put between 1.5 and 3 units left of the 1st peak.
DeltaNe_ideal: float
Ideal distance between two lines calculated for your laser wavelength.
x_range_baseline_pk1, x_range_baseline_pk2: int, float
test
y_range_baseline_pk1, y_range_baseline_pk2: int, float
test
pk1_sigma, pk2_sigma: float (Default 0.4)
Estimated sigma of each peak
x_range_peak: flt, int, or None
How much to either side of the peak to show on the final peak fitting plot
x_range_residual: flt
How much either side of the peak is used for calculating residual
x_span_pk1, x_span_pk2: float, default None
By default, function fits up to background, but can shrink that range here.
Returns
------------
if loop is False:
return df, Ne_pk1_reg_x_plot, Ne_pk1_reg_y_plot
if loop is True:
return df
Also returns figure.
"""
print(minimise)
# Check model is supported
if config.model_name not in allowed_models:
raise ValueError(f"Unsupported model: {config.model_name}. Supported models are: {', '.join(allowed_models)}")
# check they havent messed up background
if config.lower_bck_pk1[0]>config.lower_bck_pk1[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
if config.upper_bck1_pk1[0]>config.upper_bck1_pk1[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
if config.upper_bck2_pk1[0]>config.upper_bck2_pk1[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
# check they havent messed up background
if config.lower_bck_pk2[0]>config.lower_bck_pk2[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
if config.upper_bck1_pk2[0]>config.upper_bck1_pk2[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
if config.upper_bck2_pk2[0]>config.upper_bck2_pk2[1]:
raise TypeError('Your left hand number needs to be a smaller number than the right, e.g. you can have [2, 5] but you cant have [5, 2]')
x=Ne[:, 0]
spec_res=np.abs(x[1]-x[0])
# Getting things from config file
peaks_1=config.peaks_1
peaks_2=config.peaks_2
DeltaNe_ideal=config.DeltaNe_ideal
# Estimate amplitude from prominence and sigma you entered
Pk1_Amp=((config.pk1_sigma)*(Ne_prom_1))/0.3939
Pk2_Amp=((config.pk2_sigma)*(Ne_prom_2))/0.3939
#Remove the baselines
y_corr_pk1, Py_base_pk1, x_pk1, Ne_short_pk1, Py_base_pk1, Baseline_ysub_pk1, Baseline_x_pk1=remove_Ne_baseline_pk(Ne,
N_poly_pk1_baseline=config.N_poly_pk1_baseline, Ne_center_1=Ne_center_1, sigma_baseline=config.sigma_baseline,
lower_bck=config.lower_bck_pk1, upper_bck1=config.upper_bck1_pk1, upper_bck2=config.upper_bck2_pk1)
y_corr_pk2, Py_base_pk2, x_pk2, Ne_short_pk2, Py_base_pk2, Baseline_ysub_pk2, Baseline_x_pk2=remove_Ne_baseline_pk(Ne, Ne_center_1=Ne_center_2, N_poly_pk1_baseline=config.N_poly_pk2_baseline, sigma_baseline=config.sigma_baseline,
lower_bck=config.lower_bck_pk2, upper_bck1=config.upper_bck1_pk2, upper_bck2=config.upper_bck2_pk2)
# If the user hasnt specified a fitting window, it creates one automatically from the upper to lower end of the background window specified.
if config.x_span_pk1 is None:
x_span_pk1=[config.lower_bck_pk1[1], config.upper_bck1_pk1[0]]
x_span_pk1_dist=abs(config.lower_bck_pk1[1]-config.upper_bck1_pk1[0])
else:
x_span_pk1=config.x_span_pk1
x_span_pk1_dist=abs(config.x_span_pk1[1]-config.x_span_pk1[0])
if config.x_span_pk2 is None:
x_span_pk2=[config.lower_bck_pk2[1], config.upper_bck1_pk2[0]]
x_span_pk2_dist=abs(config.lower_bck_pk2[1]-config.upper_bck1_pk2[0])
else:
x_span_pk2=config.x_span_pk2
x_span_pk2_dist=abs(config.x_span_pk2[1]-config.x_span_pk2[0])
# Fit Pk1
cent_pk1, Area_pk1, sigma_pk1, gamma_pk1, Ne_pk1_reg_x_plot, Ne_pk1_reg_y_plot, Ne_pk1_reg_x, Ne_pk1_reg_y, xx_pk1, result_pk1, error_pk1, result_pk1_origx, comps, Peak1_Prop_Lor, error_pk1_amp, fwhm_pk1= fit_Ne_pk(x_pk1, y_corr_pk1, x_span=x_span_pk1, Ne_center=Ne_center_1, model_name=config.model_name, LH_offset_mini=config.LH_offset_mini, peaks_pk1=peaks_1, amplitude=Pk1_Amp, pk1_sigma=config.pk1_sigma,
const_params=const_params, spec_res=spec_res, py_baseline=Py_base_pk1, minimise=minimise)
# Fit pk2
cent_pk2,Area_pk2, sigma_pk2, gamma_pk2, Ne_pk2_reg_x_plot, Ne_pk2_reg_y_plot, Ne_pk2_reg_x, Ne_pk2_reg_y, xx_pk2, result_pk2, error_pk2, result_pk2_origx, comps2, Peak2_Prop_Lor, error_pk2_amp, fwhm_pk2 = fit_Ne_pk( x_pk2, y_corr_pk2, x_span=x_span_pk2, Ne_center=Ne_center_2, model_name=config.model_name, LH_offset_mini=config.LH_offset_mini2, peaks_pk1=peaks_2, amplitude=Pk2_Amp, pk1_sigma=config.pk2_sigma, const_params=const_params,spec_res=spec_res, py_baseline=Py_base_pk2, minimise=minimise)
# Calculate difference between peak centers, and Delta Ne
DeltaNe=cent_pk2-cent_pk1
Ne_Corr=DeltaNe_ideal/DeltaNe
# Calculate maximum splitting (+1 sigma)
error_pk1 = error_pk1 if error_pk1 is not None else 0
error_pk2 = error_pk2 if error_pk2 is not None else 0
DeltaNe_max=(cent_pk2+error_pk2)-(cent_pk1-error_pk1)
DeltaNe_min=(cent_pk2-error_pk2)-(cent_pk1+error_pk1)
Ne_Corr_max=DeltaNe_ideal/DeltaNe_min
Ne_Corr_min=DeltaNe_ideal/DeltaNe_max
# Calculating least square residual
residual_pk1=np.sum(((Ne_pk1_reg_y-result_pk1_origx)**2)**0.5)/(len(Ne_pk1_reg_y))
residual_pk2=np.sum(((Ne_pk2_reg_y-result_pk2_origx)**2)**0.5)/(len(Ne_pk2_reg_y))
if plot_figure is True:
# Make a summary figure of the backgrounds and fits
fig, ((ax3, ax2), (ax5, ax4), (ax1, ax0)) = plt.subplots(3,2, figsize = (12,15)) # adjust dimensions of figure here
fig.suptitle(filename, fontsize=16)
# Setting y limits of axis
ymin_ax1=min(Ne_short_pk1[:,1])-10
ymax_ax1=min(Ne_short_pk1[:,1])+config.y_range_baseline_pk1
ymin_ax0=min(Ne_short_pk2[:,1])-10
ymax_ax0=min(Ne_short_pk2[:,1])+config.y_range_baseline_pk2
ax1.set_ylim([ymin_ax1, ymax_ax1])
ax0.set_ylim([ymin_ax0, ymax_ax0])
# Setting x limits of axis
ax1_xmin=min(Ne_short_pk1[:,0])-config.x_range_baseline_pk1
ax1_xmax=max(Ne_short_pk1[:,0])+config.x_range_baseline_pk1
ax0_xmin=min(Ne_short_pk2[:,0])-config.x_range_baseline_pk2
ax0_xmax=max(Ne_short_pk2[:,0])+config.x_range_baseline_pk2
ax0.set_xlim([ax0_xmin, ax0_xmax])
ax1.set_xlim([ax1_xmin, ax1_xmax])
# Adding background positions as colored bars on pk1
ax1cop=ax1.twiny()
#ax1cop.set_zorder(ax1.get_zorder())
ax1cop_xmax=ax1_xmax-Ne_center_1
ax1cop_xmin=ax1_xmin-Ne_center_1
ax1cop.set_xlim([ax1cop_xmin, ax1cop_xmax])
rect_pk1_b1=patches.Rectangle((config.lower_bck_pk1[0],ymin_ax1),config.lower_bck_pk1[1]-config.lower_bck_pk1[0],ymax_ax1-ymin_ax1,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk1', alpha=0.3, zorder=0)
ax1cop.add_patch(rect_pk1_b1)
rect_pk1_b2=patches.Rectangle((config.upper_bck1_pk1[0],ymin_ax1),config.upper_bck1_pk1[1]-config.upper_bck1_pk1[0],ymax_ax1-ymin_ax1,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk2', alpha=0.3, zorder=0)
ax1cop.add_patch(rect_pk1_b2)
rect_pk1_b3=patches.Rectangle((config.upper_bck2_pk1[0],ymin_ax1),config.upper_bck2_pk1[1]-config.upper_bck2_pk1[0],ymax_ax1-ymin_ax1,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk3', alpha=0.3, zorder=0)
ax1cop.add_patch(rect_pk1_b3)
# Adding background positions as colored bars on pk2
ax0cop=ax0.twiny()
ax0cop_xmax=ax0_xmax-Ne_center_2
ax0cop_xmin=ax0_xmin-Ne_center_2
ax0cop.set_xlim([ax0cop_xmin, ax0cop_xmax])
rect_pk2_b1=patches.Rectangle((config.lower_bck_pk2[0],ymin_ax0),config.lower_bck_pk2[1]-config.lower_bck_pk2[0],ymax_ax0-ymin_ax0,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk2', alpha=0.3)
ax0cop.add_patch(rect_pk2_b1)
rect_pk2_b2=patches.Rectangle((config.upper_bck1_pk2[0],ymin_ax0),config.upper_bck1_pk2[1]-config.upper_bck1_pk2[0],ymax_ax0-ymin_ax0,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk2', alpha=0.3)
ax0cop.add_patch(rect_pk2_b2)
rect_pk2_b3=patches.Rectangle((config.upper_bck2_pk2[0],ymin_ax0),config.upper_bck2_pk2[1]-config.upper_bck2_pk2[0],ymax_ax0-ymin_ax0,
linewidth=1,edgecolor='none',facecolor='cyan', label='bck_pk3', alpha=0.3)
ax0cop.add_patch(rect_pk2_b3)
# Plotting trimmed data and background
ax0.plot(Ne_short_pk2[:,0], Py_base_pk2, '-k', label='Fit. Bck')
ax0.plot(Ne_short_pk2[:,0], Ne_short_pk2[:,1], '-r')
ax1.plot(Ne_short_pk1[:,0], Py_base_pk1, '-k', label='Fit. Bck')
ax1.plot(Ne_short_pk1[:,0], Ne_short_pk1[:,1], '-r')
# Plotting data actually used in the background, after sigma exclusion
ax1.plot(Baseline_x_pk1, Baseline_ysub_pk1, '.b', ms=6, label='Bck')
ax0.plot(Baseline_x_pk2, Baseline_ysub_pk2, '.b', ms=5, label='Bck')
mean_baseline=np.mean(Py_base_pk2)
std_baseline=np.std(Py_base_pk2)
std_baseline=np.std(Py_base_pk1)
ax1.plot([Ne_center_1, Ne_center_1], [ymin_ax1, ymax_ax1], ':k', label='Peak')
ax0.plot([Ne_center_2, Ne_center_2], [ymin_ax0, ymax_ax0], ':k', label='Peak')
ax1.set_title('%.0f' %Ne_center_1+': background fitting')
ax1.set_xlabel('Wavenumber')
ax1.set_ylabel('Intensity')
ax0.set_title('%.0f' %Ne_center_2+ ': background fitting')
ax0.set_xlabel('Wavenumber')
ax0.set_ylabel('Intensity')
ax1cop.set_xlabel('Offset from Pk estimate')
ax0cop.set_xlabel('Offset from Pk estimate')
#Showing all data, not just stuff fit
ax0.plot(Ne[:,0], Ne[:,1], '-', color='grey', zorder=0)
ax1.plot(Ne[:,0], Ne[:,1], '-', color='grey', zorder=0)
# ax1.legend()
# ax0.legend()
ax0.legend(loc='lower right', bbox_to_anchor= (-0.08, 0.8), ncol=1,
borderaxespad=0, frameon=True, facecolor='white')
# Actual peak fits.
ax2.plot(Ne_pk2_reg_x_plot, Ne_pk2_reg_y_plot, 'xb', label='all data')
ax2.plot(Ne_pk2_reg_x, Ne_pk2_reg_y, 'ok', label='data fitted')
ax2.plot(xx_pk2, result_pk2, 'r-', label='interpolated fit')
ax2.set_title('%.0f' %Ne_center_2+' peak fitting')
ax2.set_xlabel('Wavenumber')
ax2.set_ylabel('Intensity')
if config.x_range_peak is None:
ax2.set_xlim([cent_pk2-x_span_pk2_dist/2, cent_pk2+x_span_pk2_dist/2])
else:
ax2.set_xlim([cent_pk2-config.x_range_peak, cent_pk2+config.x_range_peak])
ax3.plot(Ne_pk1_reg_x_plot, Ne_pk1_reg_y_plot, 'xb', label='all data')
ax3.plot(Ne_pk1_reg_x, Ne_pk1_reg_y, 'ok', label='data fitted')
ax3.set_title('%.0f' %Ne_center_1+' peak fitting')
ax3.set_xlabel('Wavenumber')
ax3.set_ylabel('Intensity')
ax3.plot(xx_pk1, comps.get('p1_'), '-r', label='p1')
if peaks_1>1:
ax3.plot(xx_pk1, comps.get('p2_'), '-c', label='p2')
ax3.plot(xx_pk1, result_pk1, 'g-', label='best fit')
ax3.legend()
if config.x_range_peak is None:
ax3.set_xlim([cent_pk1-x_span_pk1_dist/2, cent_pk1+x_span_pk1_dist/2])
else:
ax3.set_xlim([cent_pk1-config.x_range_peak, cent_pk1+config.x_range_peak ])
# Residuals for peak fits.
ax4.plot(Ne_pk2_reg_x, Ne_pk2_reg_y-result_pk2_origx, '-r', label='residual')
ax5.plot(Ne_pk1_reg_x, Ne_pk1_reg_y-result_pk1_origx, '-r', label='residual')
ax4.plot(Ne_pk2_reg_x, Ne_pk2_reg_y-result_pk2_origx, 'ok', mfc='r', label='residual')
ax5.plot(Ne_pk1_reg_x, Ne_pk1_reg_y-result_pk1_origx, 'ok', mfc='r', label='residual')
ax4.set_ylabel('Residual (Intensity units)')
ax4.set_xlabel('Wavenumber')
ax5.set_ylabel('Residual (Intensity units)')
ax5.set_xlabel('Wavenumber')
Residual_pk2=Ne_pk2_reg_y-result_pk2_origx
Residual_pk1=Ne_pk1_reg_y-result_pk1_origx
Local_Residual_pk2=Residual_pk2[(Ne_pk2_reg_x>cent_pk2-config.x_range_residual)&(Ne_pk2_reg_x<cent_pk2+config.x_range_residual)]
Local_Residual_pk1=Residual_pk1[(Ne_pk1_reg_x>cent_pk1-config.x_range_residual)&(Ne_pk1_reg_x<cent_pk1+config.x_range_residual)]
ax5.set_xlim([cent_pk1-config.x_range_residual, cent_pk1+config.x_range_residual])
ax4.set_xlim([cent_pk2-config.x_range_residual, cent_pk2+config.x_range_residual])
ax5.plot([cent_pk1, cent_pk1 ], [np.min(Local_Residual_pk1)-10, np.max(Local_Residual_pk1)+10], ':k')
ax4.plot([cent_pk2, cent_pk2 ], [np.min(Local_Residual_pk2)-10, np.max(Local_Residual_pk2)+10], ':k')
ax5.set_ylim([np.min(Local_Residual_pk1)-10, np.max(Local_Residual_pk1)+10])
ax4.set_ylim([np.min(Local_Residual_pk2)-10, np.max(Local_Residual_pk2)+10])
fig.tight_layout()
# Save figure
path3=path+'/'+'Ne_fit_images'
if os.path.exists(path3):
out='path exists'
else:
os.makedirs(path+'/'+ 'Ne_fit_images', exist_ok=False)
figure_str=path+'/'+ 'Ne_fit_images'+ '/'+ filename+str('_Ne_Line_Fit')+str('.png')
fig.savefig(figure_str, dpi=200)
if close_figure is True:
plt.close(fig)
if prefix is True:
filename = filename.split(' ', 1)[1]
df=pd.DataFrame(data={'filename': filename,
'pk2_peak_cent':cent_pk2,
'pk2_amplitude': Area_pk2,
'pk2_sigma': sigma_pk2,
'pk2_gamma': gamma_pk2,
'error_pk2': error_pk2,
'Peak2_Prop_Lor': Peak2_Prop_Lor,
'pk1_peak_cent':cent_pk1,
'pk1_amplitude': Area_pk1,
'pk1_sigma': sigma_pk1,
'pk1_gamma': gamma_pk1,
'error_pk1': error_pk1,
'Peak1_Prop_Lor': Peak1_Prop_Lor,
'pk1_fwhm': fwhm_pk1,
'pk2_fwhm': fwhm_pk2,
'deltaNe': DeltaNe,
'Ne_Corr': Ne_Corr,
'Ne_Corr_min':Ne_Corr_min,
'Ne_Corr_max': Ne_Corr_max,
'residual_pk2':residual_pk2,
'residual_pk1': residual_pk1,
'residual_pk1+pk2':residual_pk1+residual_pk2,
'error_pk1_amplitude': error_pk1_amp,
'error_pk2_amplitude': error_pk2_amp
}, index=[0])
df_combo=df
pk1_peak_cent_values = df_combo['pk1_peak_cent'].values
pk1_peak_cent_errors = df_combo['error_pk1'].fillna(0).infer_objects().values
pk2_peak_cent_values = df_combo['pk2_peak_cent'].values
pk2_peak_cent_errors = df_combo['error_pk2'].fillna(0).infer_objects().values
constant=df_combo['deltaNe']
# Calculate the error on Ne_Corr using error propagation (quadrature)
Ne_Corr_errors = np.sqrt((pk1_peak_cent_errors / constant) ** 2 + (pk2_peak_cent_errors / constant) ** 2)
test_err=np.sqrt(pk1_peak_cent_errors ** 2 + pk2_peak_cent_errors ** 2)
total_err=test_err/constant
df.insert(1,'1σ_Ne_Corr', Ne_Corr_errors)
df.insert(1,'1σ_Ne_Corr_test', total_err)
if save_clipboard is True:
df.to_clipboard(excel=True, header=False, index=False)
if loop is False:
return df, Ne_pk1_reg_x_plot, Ne_pk1_reg_y_plot
if loop is True:
return df
## Plot to help inspect which Ne lines to discard
def plot_Ne_corrections(df=None, x_axis=None, x_label='index', marker='o', mec='k',
mfc='r'):
if x_axis is not None:
x=x_axis
else:
x=df.index
fig, ((ax5, ax6), (ax3, ax4), (ax1, ax2)) = plt.subplots(3, 2, figsize=(10, 12))
# Pk1 center vs. X
ax5.errorbar(x, df['pk1_peak_cent'], xerr=0, yerr=df['error_pk1'].fillna(0).infer_objects(),
fmt='o', ecolor='k', elinewidth=0.8, mfc='b', ms=5, mec='k', capsize=3)
ax5.set_xlabel(x_label)
ax5.set_ylabel('Peak 1 center')
# Pk2 center vs. X
ax6.plot(x, df['pk2_peak_cent'], marker, mec='k', mfc='r')
ax6.errorbar(x, df['pk2_peak_cent'], xerr=0, yerr=df['error_pk2'].fillna(0).infer_objects(),
fmt='o', ecolor='k', elinewidth=0.8, mfc='r', ms=5, mec='k', capsize=3)
ax6.set_xlabel(x_label)
ax6.set_ylabel('Peak 2 center')
#
ax3.errorbar(df['Ne_Corr'], df['pk2_peak_cent'], xerr=df['1σ_Ne_Corr'].fillna(0).infer_objects(),
yerr=df['error_pk2'].fillna(0).infer_objects(),
fmt='o', ecolor='k', elinewidth=0.8, mfc='b', ms=5, mec='k', capsize=3)
ax3.set_xlabel('Ne Correction factor')
ax3.set_ylabel('Peak 2 center')
ax4.errorbar(df['Ne_Corr'], df['pk1_peak_cent'], xerr=df['1σ_Ne_Corr'].fillna(0).infer_objects(),
yerr=df['error_pk1'].fillna(0).infer_objects(),
fmt='o', ecolor='k', elinewidth=0.8, mfc='b', ms=5, mec='k', capsize=3)
ax4.set_xlabel('Ne Correction factor')
ax4.set_ylabel('Peak 1 center')
# Ne correction factor vs. time
ax1.errorbar(x, df['Ne_Corr'], xerr=0, yerr=df['1σ_Ne_Corr'].fillna(0).infer_objects(),
fmt='o', ecolor='k', elinewidth=0.8, mfc='grey', ms=5, mec='k',capsize=3)
ax1.set_ylabel('Ne Correction factor')
ax1.set_xlabel(x_label)
# Ne correction factor vs. residual
ax2.set_xlabel('Sum of pk1 and pk2 residual')
ax2.set_ylabel('Ne Correction factor')
plt.setp(ax3.get_xticklabels(), rotation=30, horizontalalignment='right')
plt.setp(ax4.get_xticklabels(), rotation=30, horizontalalignment='right')
ax1.ticklabel_format(useOffset=False)
ax5.ticklabel_format(useOffset=False)
ax6.ticklabel_format(useOffset=False)
ax2.ticklabel_format(useOffset=False)
ax3.ticklabel_format(useOffset=False)
ax4.ticklabel_format(useOffset=False)
fig.tight_layout()
return fig
## Looping Ne lines
def loop_Ne_lines(*, files, spectra_path, filetype,
config, config_ID_peaks, df_fit_params=None, prefix=False, print_df=False,
plot_figure=True, single_acq=False, const_params=True, minimise='least_squares'):
df = pd.DataFrame([])
# This is for repeated acquisition of Ne lines
if single_acq is True:
for i in tqdm(range(0, np.shape(files)[1]-2)):
filename=str(i)
Ne=np.column_stack((files[:, 0], files[:, i+1]))
Ne, df_fit_params=identify_Ne_lines(Ne_array=Ne,
config=config_ID_peaks, print_df=False, plot_figure=False)
data=fit_Ne_lines(Ne=Ne, path=spectra_path,
config=config, prefix=prefix,
Ne_center_1=df_fit_params['Peak1_cent'].iloc[0],
Ne_center_2=df_fit_params['Peak2_cent'].iloc[0],
Ne_prom_1=df_fit_params['Peak1_prom'].iloc[0],
Ne_prom_2=df_fit_params['Peak2_prom'].iloc[0],
const_params=const_params,
plot_figure=plot_figure, minimise=minimise)
df = pd.concat([df, data], axis=0)
else:
for i in tqdm(range(0, len(files))):
filename=files[i]
Ne, df_fit_params=identify_Ne_lines(path=spectra_path,
filename=filename, filetype=filetype,
config=config_ID_peaks, print_df=False, plot_figure=False)
data=fit_Ne_lines(Ne=Ne, filename=filename,
path=spectra_path, prefix=prefix,
config=config,
Ne_center_1=df_fit_params['Peak1_cent'].iloc[0],
Ne_center_2=df_fit_params['Peak2_cent'].iloc[0],
Ne_prom_1=df_fit_params['Peak1_prom'].iloc[0],
Ne_prom_2=df_fit_params['Peak2_prom'].iloc[0],
const_params=const_params,
plot_figure=plot_figure, minimise=minimise)
df = pd.concat([df, data], axis=0)
#print('working on ' + str(files[i]))
df2=df.reset_index(drop=True)
# Now lets reorder some columns
cols_to_move = ['filename', 'Ne_Corr', '1σ_Ne_Corr', 'deltaNe', 'pk2_peak_cent', 'pk1_peak_cent', 'pk2_amplitude', 'pk1_amplitude', 'residual_pk2', 'residual_pk1']
df2 = df2[cols_to_move + [
col for col in df2.columns if col not in cols_to_move]]
return df2
## Regressing Ne lines against time
from scipy.interpolate import interp1d
[docs]
def reg_Ne_lines_time(df, fit='poly', N_poly=None, spline_fit=None):
"""
Parameters
-----------
df: pd.DataFrame
dataframe of stitched Ne fits and metadata information from WITEC,
must have columns 'sec since midnight' and 'Ne_Corr'
fit: float 'poly', or 'spline'
If 'poly':
N_poly: int, degree of polynomial to fit (1 if linear)
if 'spline':
spline_fit: The string has to be one of:
‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’,
‘quadratic’, ‘cubic’, ‘previous’. Look up documentation for interpld
Returns
-----------
figure of fit and data used to make it
Pf: fit model, can be used to evaluate unknown data (only within x range of df for spline fits).
"""
Px=np.linspace(np.min(df['sec since midnight']), np.max(df['sec since midnight']),
101)
if fit=='poly':
Pf = np.poly1d(np.polyfit(df['sec since midnight'], df['Ne_Corr'],
N_poly))
if fit == 'spline':
if spline_fit is None:
raise TypeError('If you choose spline you also need to choose the type of spline fit. do help of this function to get the options')
else:
Pf = interp1d(df['sec since midnight'], df['Ne_Corr'], kind=spline_fit)
Py=Pf(Px)
fig, (ax1) = plt.subplots(1, 1, figsize=(6, 3))
ax1.plot(df['sec since midnight'], df['Ne_Corr'], 'xk')
ax1.plot(Px, Py, '-r')
ax1.set_xlabel('Seconds since midnight')
ax1.set_ylabel('Ne Correction Factor')
ax1.ticklabel_format(useOffset=False)
return Pf, fig
[docs]
def filter_Ne_Line_neighbours(*, df_combo=None, Corr_factor=None, number_av=6, offset=0.00005, file_name_filt=None):
""" This function discards Ne lines with a correction factor more than
offset away from the median value of the N points (number_av) either side of it.
"""
if df_combo is not None:
Corr_factor=df_combo['Ne_Corr']
Corr_factor_Filt=np.zeros(len(Corr_factor), dtype=float)
median_loop=np.zeros(len(Corr_factor), dtype=float)
for i in range(0, len(Corr_factor)):
if i<len(Corr_factor)/2: # For first half, do 5 after
median_loop[i]=np.nanmedian(Corr_factor[i:i+number_av])
if i>=len(Corr_factor)/2: # For first half, do 5 after
median_loop[i]=np.nanmedian(Corr_factor[i-number_av:i])
if Corr_factor[i]>(median_loop[i]+offset) or Corr_factor[i]<(median_loop[i]-offset) :
Corr_factor_Filt[i]=np.nan
else:
Corr_factor_Filt[i]=Corr_factor[i]
ds=pd.Series(Corr_factor_Filt)
# Filter based on file names
if file_name_filt is not None:
pattern = '|'.join(file_name_filt)
mask = df_combo['filename_x'].str.contains(pattern)
filtered_ds = ds.where(~mask, np.nan)
return filtered_ds
else:
return ds
## Lets make a plotting function for this notebook
[docs]
def generate_Ne_corr_model(*, time, Ne_corr, N_poly=3, CI=0.67, bootstrap=False, std_error=True, N_bootstrap=500,save_fig=False, pkl_name='polyfit_data.pkl'):
""" This function takes time stamp and Ne correctoin data to make a predictive polynomial, which it then saves as a pkl file
Parameters
---------------
time: pd.Series
Time through the run
Ne_corr: df or pd.Series
if dataframe, has to have the column 'Ne_corr'
Else, just Ne correction factor as a pd.Series itself
CI: float. Default 0.67
Confidence interval to save as uncertainty on model.
Either:
std_error: bool (True)
calculates uncertainty on model using CI
Or:
boostrap: bool (False)
Used for testing of method, keep as False
save_fig:bool
Saves figure.
pkl_name: str
Name of model that is saved.
"""
# Define the x and y values
x_all = np.array([time])
if isinstance(Ne_corr, pd.DataFrame):
y_all = np.array([Ne_corr['Ne_Corr']])
y_err=Ne_corr['1σ_Ne_Corr']
else:
y_all=Ne_corr
y_err=0*Ne_corr
non_nan_indices = ~np.isnan(x_all) & ~np.isnan(y_all)
# Filter out NaN values
x = x_all[non_nan_indices]
y = y_all[non_nan_indices]
# Perform polynomial regression
coefficients = np.polyfit(x, y, N_poly)
Pf = np.poly1d(coefficients)
# Save the model and the data to a pickle file
data = {'model': Pf, 'x': x, 'y': y}
with open(pkl_name, 'wb') as f:
pickle.dump(data, f)
if bootstrap is True:
new_x_plot=np.linspace(np.min(x), np.max(x), 100)
Ne_corr2=calculate_Ne_corr_bootstrap_values(pickle_str=pkl_name,
new_x=pd.Series(new_x_plot), N_poly=N_poly, CI=CI, N_bootstrap=N_bootstrap)
if std_error is True:
new_x_plot=np.linspace(np.min(x), np.max(x), 100)
Ne_corr2=calculate_Ne_corr_std_err_values(pickle_str=pkl_name,
new_x=pd.Series(new_x_plot), CI=CI)
# Now lets plot the prediction interval
fig, (ax1) = plt.subplots(1, 1, figsize=(10,5))
ax1.errorbar(x, y, xerr=0, yerr=y_err,
fmt='o', ecolor='k', elinewidth=0.8, mfc='grey', ms=5, mec='k',capsize=3)
ax1.plot(new_x_plot, Ne_corr2['preferred_values'], '-k', label='best fit')
ax1.plot(new_x_plot, Ne_corr2['lower_values'], ':k', label='lower vals')
ax1.plot(new_x_plot, Ne_corr2['upper_values'], ':k', label='upper vals')
ax1.set_xlabel('sec after midnight')
ax1.set_ylabel('Ne Corr factor')
ax1.legend()
ax1.plot(x, y, '+r', label='Ne lines')
ax1.ticklabel_format(useOffset=False)
# this sets the ordinal suffix for the polynomial degree in the title
if N_poly == 1:
suffix = 'st'
elif N_poly == 2:
suffix = 'nd'
elif N_poly == 3:
suffix = 'rd'
else:
suffix='th'
ax1.set_title(f"{N_poly}$^{{{suffix}}}$ degree polynomial: "+str(100*CI) + ' % prediction interval')
if save_fig is True:
fig.savefig('Ne_line_correction.png')
from scipy.stats import t
def calculate_Ne_corr_std_err_values(*, pickle_str, new_x, CI=0.67):
# Load the model and the data from the pickle file
with open(pickle_str, 'rb') as f:
data = pickle.load(f)
model = data['model']
N_poly = model.order - 1
Pf = data['model']
x = data['x']
y = data['y']
# Convert new_x to plain numpy array
new_x_array = np.asarray(new_x)
# Calculate the residuals
residuals = y - Pf(x)
# Calculate the standard deviation of the residuals
residual_std = np.std(residuals)
# Calculate the standard errors for the new x values
mean_x = np.mean(x)
n = len(x)
standard_errors = residual_std * np.sqrt(1 + 1/n + (new_x_array - mean_x)**2 / np.sum((x - mean_x)**2))
# Calculate the degrees of freedom
df_dof = len(x) - (N_poly + 1)
# Calculate the t value for the given confidence level
t_value = t.ppf((1 + CI) / 2, df_dof)
# Calculate the prediction intervals
preferred_values = Pf(new_x_array)
lower_values = preferred_values - t_value * standard_errors
upper_values = preferred_values + t_value * standard_errors
df_out = pd.DataFrame(data={
'time': new_x_array,
'preferred_values': preferred_values,
'lower_values': lower_values,
'upper_values': upper_values
})
return df_out
def calculate_Ne_corr_bootstrap_values(*, pickle_str, new_x, N_poly=3, CI=0.67, N_bootstrap=500):
# Load the model and the data from the pickle file
with open(pickle_str, 'rb') as f:
data = pickle.load(f)
Pf = data['model']
x = data['x']
y = data['y']
# Define the function
x_values=new_x
N_poly = N_poly # degree of the polynomial
n_bootstrap = N_bootstrap # number of bootstrap samples
confidence = CI # confidence level
preferred_values = []
lower_values = []
upper_values = []
for new_x in x_values:
# Perform bootstrapping
bootstrap_predictions = []
for _ in range(n_bootstrap):
# Resample with replacement
bootstrap_indices = np.random.choice(len(x), size=len(x), replace=True)
bootstrap_x = x[bootstrap_indices]
bootstrap_y = y[bootstrap_indices]
# Perform polynomial regression on the bootstrap sample
bootstrap_coefficients = np.polyfit(bootstrap_x, bootstrap_y, N_poly)
bootstrap_Pf = np.poly1d(bootstrap_coefficients)
# Calculate predicted value for the new x
bootstrap_prediction = bootstrap_Pf(new_x)
bootstrap_predictions.append(bootstrap_prediction)
# Calculate prediction interval
lower_quantile = (1 - confidence) / 2
upper_quantile = 1 - lower_quantile
lower_index = int(lower_quantile * n_bootstrap)
upper_index = int(upper_quantile * n_bootstrap)
bootstrap_predictions_sorted = np.sort(bootstrap_predictions)
lower_value = bootstrap_predictions_sorted[lower_index]
upper_value = bootstrap_predictions_sorted[upper_index]
preferred_values.append(Pf(new_x))
lower_values.append(lower_value)
upper_values.append(upper_value)
df=pd.DataFrame(data={'time': new_x,
'preferred_values': preferred_values,
'lower_values': lower_values,
'upper_values': upper_values}
)
return df