# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Post processing of station source parameters.
:copyright:
2012-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import logging
import numpy as np
from scipy.stats import norm
from scipy.integrate import quad
from sourcespec.ssp_setup import ssp_exit
from sourcespec.ssp_data_types import (
SummarySpectralParameter, SummaryStatistics)
from sourcespec.ssp_util import mag_to_moment
from sourcespec.ssp_spectral_model import spectral_model
from sourcespec.spectrum import Spectrum
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
def _avg_and_std(values, errors=None, logarithmic=False):
"""
Return the average and standard deviation.
Optionally:
- errors can be specified for weighted statistics
- logarithmic average and standard deviation
"""
average = std = np.nan
if len(values) == 0:
return average, np.array((std, std))
if errors is not None and len(errors) == 0:
return average, np.array((std, std))
if np.all(np.isnan(values)):
return average, np.array((std, std))
weights = _weights(values, errors, logarithmic)
notnan = ~np.isnan(values)
notinf = ~np.isinf(values)
keepvals = np.logical_and(notnan, notinf)
if logarithmic:
positive = values > 0
keepvals = np.logical_and(keepvals, positive)
if weights is not None:
notnan = np.logical_and(keepvals, ~np.isnan(weights))
weights = weights[keepvals]
values = values[keepvals]
if not values.size:
return np.nan, np.array((np.nan, np.nan))
if logarithmic:
values = np.log10(values)
average = np.average(values, weights=weights)
variance = np.average((values - average)**2, weights=weights)
std = np.sqrt(variance)
if not logarithmic:
return average, np.array((std, std))
log_average = 10.**average
minus = log_average - 10.**(average - std)
plus = 10.**(average + std) - log_average
return log_average, np.array((minus, plus))
def _weights(values, errors=None, logarithmic=False):
"""Compute weights for weighted statistics."""
if errors is None:
return None
# negative errors should not happen
errors[errors < 0] = 0
values_minus = values - errors[:, 0]
values_plus = values + errors[:, 1]
if logarithmic:
# compute the width of the error bar in log10 units
# replace negative left values with 1/10 of the central value
values_minus[values_minus <= 0] = values[values_minus <= 0] / 10
values_log_minus = np.log10(values_minus)
values_log_plus = np.log10(values_plus)
errors_width = values_log_plus - values_log_minus
else:
# compute the width of the error bar in linear units
errors_width = values_plus - values_minus
try:
# fix for infinite weight (zero error width)
errors_width[errors_width == 0] =\
np.nanmin(errors_width[errors_width > 0])
except ValueError:
# if all errors are zero, return ones
return np.ones_like(errors_width)
return 1. / (errors_width**2.)
def _normal_confidence_level(n_sigma):
"""
Compute the confidence level of a normal (Gaussian) distribution
between -n_sigma and +n_sigma.
"""
def gauss(x):
return norm.pdf(x, 0, 1)
confidence, _ = quad(gauss, -n_sigma, n_sigma)
return np.round(confidence * 100, 2)
def _percentiles(
values, low_percentage=25, mid_percentage=50, up_percentage=75):
"""Compute lower, mid and upper percentiles."""
if len(values) == 0:
return np.nan, np.nan, np.nan
low_percentile, mid_percentile, up_percentile =\
np.nanpercentile(
values, (low_percentage, mid_percentage, up_percentage))
return low_percentile, mid_percentile, up_percentile
def _param_summary_statistics(
config, sspec_output, param_id, name, format_spec, units=None,
logarithmic=False):
"""Compute summary statistics for one spectral parameter."""
nIQR = config.nIQR
summary = SummarySpectralParameter(
param_id=param_id, name=name, format_spec=format_spec, units=units)
sspec_output.find_outliers(param_id, n=nIQR)
values = sspec_output.value_array(param_id, filter_outliers=True)
errors = sspec_output.error_array(param_id, filter_outliers=True)
# put to NaN infinite values and values whose errors are infinite
values[np.isinf(values)] = np.nan
_cond_err = np.logical_or(np.isinf(errors[:, 0]), np.isinf(errors[:, 1]))
values[_cond_err] = np.nan
errors[_cond_err] = np.nan
# only count non-NaN values
nobs = len(values[~np.isnan(values)])
# mean
mean_value, mean_error = _avg_and_std(values, logarithmic=logarithmic)
mean_error *= config.n_sigma
conf_level = _normal_confidence_level(config.n_sigma)
summary.mean = SummaryStatistics(
stat_type='mean', value=mean_value,
lower_uncertainty=mean_error[0],
upper_uncertainty=mean_error[1],
confidence_level=conf_level, nobs=nobs)
if not np.all(np.isnan(errors)):
# weighted mean (only if errors are defined)
wmean_value, wmean_error = _avg_and_std(
values, errors, logarithmic=logarithmic)
else:
# else use an unweighted mean
logger.info(
f'{param_id} values have no uncertainty: weighted mean cannot be '
'computed. Using unweighted mean instead.')
wmean_value, wmean_error = mean_value, mean_error
summary.weighted_mean = SummaryStatistics(
stat_type='weighted_mean', value=wmean_value,
lower_uncertainty=wmean_error[0],
upper_uncertainty=wmean_error[1],
confidence_level=conf_level, nobs=nobs)
# percentiles
low_pctl, mid_pctl, up_pctl = _percentiles(
values, config.lower_percentage, config.mid_percentage,
config.upper_percentage)
conf_level = round(
(config.upper_percentage - config.lower_percentage), 2)
summary.percentiles = SummaryStatistics(
stat_type='percentiles', value=mid_pctl,
lower_uncertainty=mid_pctl - low_pctl,
upper_uncertainty=up_pctl - mid_pctl,
confidence_level=conf_level,
lower_percentage=config.lower_percentage,
mid_percentage=config.mid_percentage,
upper_percentage=config.upper_percentage,
nobs=nobs)
return summary
def _make_summary_spec(station, channel, freq_logspaced, Mw, fc, t_star):
"""Create a synthetic spectrum for summary statistics."""
sp = Spectrum()
sp.stats.station = station
sp.stats.channel = channel
sp.freq_logspaced = freq_logspaced
data_mag_logspaced = spectral_model(freq_logspaced, Mw, fc, t_star)
# data_logspaced must be provided before data_mag_logspaced
sp.data_logspaced = mag_to_moment(data_mag_logspaced)
sp.data_mag_logspaced = data_mag_logspaced
return sp
def _add_summary_spectra(sspec_output, spec_st):
"""Add to the spectra stream synthetic spectra for summary statistics."""
fmins = [np.nanmin(spec.freq) for spec in spec_st]
fmaxs = [np.nanmax(spec.freq) for spec in spec_st]
fmin = np.nanmin(fmins)
fmax = np.nanmax(fmaxs)
npts = 100
freq_logspaced = np.logspace(np.log10(fmin), np.log10(fmax), npts)
summary_values = sspec_output.reference_values()
Mw = summary_values['Mw']
fc = summary_values['fc']
t_star = summary_values['t_star']
# check if any spec in spec_st has t_star_model defined
for spec in spec_st:
t_star_model = spec.stats.get('t_star_model', None)
if t_star_model is not None:
t_star = t_star_model
break
spec_st.append(
_make_summary_spec('SUMMARY', 'SSS', freq_logspaced, Mw, fc, t_star)
)
spec_st.append(
_make_summary_spec('SUMMARY', 'SSs', freq_logspaced, Mw, fc, 0)
)
spec_st.append(
_make_summary_spec('SUMMARY', 'SSt', freq_logspaced, Mw, 1e999, t_star)
)
def _compute_dispersion_around_summary_spec(spec_st, weight_st):
"""
Calculate the normalized weighted root mean square (RMS) dispersion
of all spectra with respect to the summary synthetic spectrum.
For each spectrum, interpolate its data to the frequency grid of the
summary spectrum, compute the weighted squared differences, and sum across
all spectra.
The normalization is performed by dividing the weighted RMS by the
weighted standard deviation of all data points used in the calculation.
The computation is made in magnitude units (but the result is unitless).
Returns the normalized weighted RMS dispersion value.
"""
summary_spec = spec_st.select(station='SUMMARY', channel='SSS')[0]
freq_logspaced = summary_spec.freq_logspaced
data = np.array([])
weights = np.array([])
diffs = np.array([])
for spec in spec_st:
if spec.stats.channel[-1] != 'H':
continue
if spec.stats.ignore:
continue
# find the same spec in weight_st
try:
weight_spec = weight_st.select(id=spec.id)[0]
except IndexError:
# this should not happen, but if it does, use a weight of 1
weight_spec = spec.copy()
weight_spec.data_logspaced = np.ones_like(spec.data_mag_logspaced)
# interpolate the spectrum at the frequencies of the summary spectrum
spec_interp = spec.copy()
spec_interp.interp_data_logspaced_to_new_freq(freq_logspaced)
weight_spec_interp = weight_spec.copy()
weight_spec_interp.interp_data_logspaced_to_new_freq(freq_logspaced)
# accumulate data and weights for weighted RMS calculation
data = np.concatenate((data, spec_interp.data_mag_logspaced))
weights = np.concatenate(
(weights, weight_spec_interp.data_logspaced))
diffs = np.concatenate((
diffs,
spec_interp.data_mag_logspaced - summary_spec.data_mag_logspaced
))
# if data is still empty, return NaN
if len(data) == 0:
return np.nan
wrms2 = np.nansum(weights * diffs**2)
wsum = np.nansum(weights)
# compute the weighted RMS
wsum = 1 if wsum == 0 else wsum
wrms = np.sqrt(wrms2 / wsum)
logger.debug(
'Weighted RMS of spectral dispersion (in magnitude units): '
f'{wrms:.3f}')
# normalize by the 80% inter-percentile range of all data points
perc_10 = np.nanpercentile(data, 10)
perc_90 = np.nanpercentile(data, 90)
data_std = (perc_90 - perc_10)
logger.debug(
'80% inter-percentile range of all spectra (in magnitude units): '
f'{data_std:.3f}'
)
# return the normalized weighted RMS
return wrms / data_std if data_std > 0 else wrms
[docs]
def compute_summary_statistics(config, sspec_output, spec_st, weight_st):
"""Compute summary statistics from station spectral parameters."""
logger.info('Computing summary statistics...')
if len(sspec_output.station_parameters) == 0:
logger.info('No source parameter calculated')
ssp_exit()
sspec_output.summary_spectral_parameters.reference_statistics =\
config.reference_statistics
# Mw
sspec_output.summary_spectral_parameters.Mw =\
_param_summary_statistics(
config, sspec_output,
param_id='Mw', name='moment magnitude', format_spec='{:.2f}',
logarithmic=False
)
# Mo (N·m)
sspec_output.summary_spectral_parameters.Mo =\
_param_summary_statistics(
config, sspec_output,
param_id='Mo', name='seismic moment', units='N·m',
format_spec='{:.3e}', logarithmic=True
)
# fc (Hz)
sspec_output.summary_spectral_parameters.fc =\
_param_summary_statistics(
config, sspec_output,
param_id='fc', name='corner frequency', units='Hz',
format_spec='{:.3f}', logarithmic=True
)
# t_star (s)
sspec_output.summary_spectral_parameters.t_star =\
_param_summary_statistics(
config, sspec_output,
param_id='t_star', name='t-star', units='s', format_spec='{:.3f}',
logarithmic=False
)
# radius (meters)
sspec_output.summary_spectral_parameters.radius =\
_param_summary_statistics(
config, sspec_output,
param_id='radius', name='source radius', units='m',
format_spec='{:.3f}', logarithmic=True
)
# static stress drop (MPa)
sspec_output.summary_spectral_parameters.ssd =\
_param_summary_statistics(
config, sspec_output,
param_id='ssd', name='static stress drop',
units='MPa', format_spec='{:.3e}',
logarithmic=True
)
# Quality factor
sspec_output.summary_spectral_parameters.Qo =\
_param_summary_statistics(
config, sspec_output,
param_id='Qo', name='quality factor', format_spec='{:.1f}',
logarithmic=False
)
# Er (N·m)
sspec_output.summary_spectral_parameters.Er =\
_param_summary_statistics(
config, sspec_output,
param_id='Er', name='radiated energy', units='N·m',
format_spec='{:.3e}', logarithmic=True
)
# Apparent stress (MPa)
sspec_output.summary_spectral_parameters.sigma_a =\
_param_summary_statistics(
config, sspec_output,
param_id='sigma_a', name='apparent stress', units='MPa',
format_spec='{:.3e}', logarithmic=True
)
# Ml
if config.compute_local_magnitude:
sspec_output.summary_spectral_parameters.Ml =\
_param_summary_statistics(
config, sspec_output,
param_id='Ml', name='local magnitude', format_spec='{:.2f}',
logarithmic=False
)
# Log info on summary statistics for each parameter
# Note that numpy.float64 are cast to float to display them in the log
# in a more readable way
params_name = ('Mw', 'fc', 't_star')
means = sspec_output.mean_values()
sourcepar_mean = {par: float(means[par]) for par in params_name}
logger.info(f'params_mean: {sourcepar_mean}')
means_weight = sspec_output.weighted_mean_values()
sourcepar_mean_weight = {
par: float(means_weight[par]) for par in params_name}
logger.info(f'params_mean_weighted: {sourcepar_mean_weight}')
percentiles = sspec_output.percentiles_values()
sourcepar_percentiles = {
par: float(percentiles[par]) for par in params_name}
logger.info(f'params_percentiles: {sourcepar_percentiles}')
# Add synthetic spectra for summary statistics
_add_summary_spectra(sspec_output, spec_st)
# Compute dispersion around the summary synthetic spectrum
logger.info(
'Computing dispersion around the summary synthetic spectrum...')
rmsn = _compute_dispersion_around_summary_spec(spec_st, weight_st)
sspec_output.quality_info.spectral_dispersion_rmsn = rmsn
logger.info(
f'Normalized RMS of the dispersion around the summary synthetic '
f'spectrum: {rmsn:.3f}')
# Compute spectral dispersion score, in percentage, 100 is no dispersion
if np.isnan(rmsn):
spec_dispersion_score = np.nan
spec_dispersion_score_str = 'nan'
else:
spec_dispersion_score = np.exp(-rmsn) * 100
spec_dispersion_score_str = f'{spec_dispersion_score:.1f}%'
logger.info(
f'Spectral dispersion score: {spec_dispersion_score_str}')
sspec_output.quality_info.spectral_dispersion_score = spec_dispersion_score
logger.info('Computing summary statistics: done')
logger.info('---------------------------------------------------')