Source code for ssp_inversion

# -*- coding: utf8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Spectral inversion routines for sourcespec.

:copyright:
    2012 Claudio Satriano <satriano@ipgp.fr>

    2013-2014 Claudio Satriano <satriano@ipgp.fr>,
              Emanuela Matrullo <matrullo@geologie.ens.fr>,
              Agnes Chounet <chounet@ipgp.fr>

    2015-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import logging
import numpy as np
from scipy.optimize import curve_fit, minimize, basinhopping
from scipy.signal import argrelmax
from obspy.geodetics import gps2dist_azimuth
from sourcespec.spectrum import SpectrumStream
from sourcespec.ssp_spectral_model import (
    spectral_model, objective_func, callback)
from sourcespec.ssp_util import (
    mag_to_moment, source_radius, static_stress_drop, quality_factor,
    select_trace, smooth)
from sourcespec.ssp_data_types import (
    InitialValues, Bounds, SpectralParameter, StationParameters,
    SourceSpecOutput)
from sourcespec.ssp_grid_sampling import GridSampling
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])


def _curve_fit(config, spec, weight, yerr, initial_values, bounds):
    """
    Curve fitting.

    Available algorithms:
      - Levenberg-Marquardt (LM, via `curve_fit()`). Automatically switches to
        Trust Region Reflective algorithm if bounds are provided.
      - Truncated Newton algorithm (TNC) with bounds.
      - Basin-hopping (BH)
      - Grid search (GS)
    """
    freq_logspaced = spec.freq_logspaced
    ydata = spec.data_mag_logspaced
    minimize_func = objective_func(freq_logspaced, ydata, weight)
    if config.inv_algorithm == 'TNC':
        res = minimize(
            minimize_func,
            x0=initial_values.get_params0(), method='TNC',
            callback=callback, bounds=bounds.bounds
        )
        params_opt = res.x
        # trick: use curve_fit() bounded to params_opt
        # to get the covariance
        # pylint: disable=unbalanced-tuple-unpacking
        _, params_cov = curve_fit(
            spectral_model, freq_logspaced, ydata,
            p0=params_opt, sigma=yerr,
            bounds=(params_opt - (1e-10), params_opt + (1e-10))
        )
        err = np.sqrt(params_cov.diagonal())
        # symmetric error
        params_err = ((e, e) for e in err)
    elif config.inv_algorithm == 'LM':
        bnds = bounds.get_bounds_curve_fit()
        if bnds is not None:
            logger.info(
                'Trying to use using Levenberg-Marquardt '
                'algorithm with bounds. Switching to the '
                'Trust Region Reflective algorithm.'
            )
        # pylint: disable=unbalanced-tuple-unpacking
        params_opt, params_cov = curve_fit(
            spectral_model, freq_logspaced, ydata,
            p0=initial_values.get_params0(), sigma=yerr,
            bounds=bnds
        )
        err = np.sqrt(params_cov.diagonal())
        # symmetric error
        params_err = ((e, e) for e in err)
    elif config.inv_algorithm == 'BH':
        res = basinhopping(
            minimize_func, x0=initial_values.get_params0(), niter=100,
            accept_test=bounds
        )
        params_opt = res.x
        # trick: use curve_fit() bounded to params_opt
        # to get the covariance
        # pylint: disable=unbalanced-tuple-unpacking
        _, params_cov = curve_fit(
            spectral_model, freq_logspaced, ydata,
            p0=params_opt, sigma=yerr,
            bounds=(params_opt - (1e-10), params_opt + (1e-10))
        )
        err = np.sqrt(params_cov.diagonal())
        # symmetric error
        params_err = ((e, e) for e in err)
    elif config.inv_algorithm in ['GS', 'IS']:
        nsteps = (20, 150, 150)  # we do fewer steps in magnitude
        sampling_mode = ('lin', 'log', 'lin')
        params_name = ('Mw', 'fc', 't_star')
        params_unit = ('', 'Hz', 's')
        grid_sampling = GridSampling(
            minimize_func, bounds.bounds, nsteps,
            sampling_mode, params_name, params_unit)
        if config.inv_algorithm == 'GS':
            grid_sampling.grid_search()
        elif config.inv_algorithm == 'IS':
            grid_sampling.kdtree_search()
        params_opt = grid_sampling.params_opt
        params_err = grid_sampling.params_err
        spec_label = f'{spec.id} {spec.stats.instrtype}'
        grid_sampling.plot_conditional_misfit(config, spec_label)
        # fc-t_star
        plot_par_idx = (1, 2)
        grid_sampling.plot_misfit_2d(config, plot_par_idx, spec_label)
        # fc-Mw
        plot_par_idx = (1, 0)
        grid_sampling.plot_misfit_2d(config, plot_par_idx, spec_label)
        # tstar-Mw
        plot_par_idx = (2, 0)
        grid_sampling.plot_misfit_2d(config, plot_par_idx, spec_label)
    misfit = minimize_func(params_opt)
    return params_opt, params_err, misfit


def _freq_ranges_for_Mw0_and_tstar0(config, weight, freq_logspaced, statId):
    """Find the frequency range to compute Mw_0 and, possibly, t_star_0."""
    if config.weighting == 'noise':
        # we start where signal-to-noise becomes strong
        idx0 = np.where(weight > 0.5)[0][0]
        # we stop at the first max of signal-to-noise (proxy for fc)
        idx_max = argrelmax(weight)[0]
        # just keep the indexes for maxima > 0.5
        idx_max = [idx for idx in idx_max if weight[idx] > 0.5]
        if not idx_max:
            # if idx_max is empty, then the source and/or noise spectrum
            # is most certainly "strange". In this case, we simply give up.
            raise RuntimeError(
                f'{statId}: unable to find a frequency range to compute Mw_0. '
                'This is possibly due to an uncommon spectrum '
                '(e.g., a resonance).'
            )
        idx1 = idx_max[0]
        if idx1 == idx0:
            try:
                idx1 = idx_max[1]
            except IndexError:
                # if there are no other maxima, just take 5 points
                idx1 = idx0 + 5
    elif config.weighting == 'frequency':
        idx0 = 0
        # the closest index to f_weight:
        idx1 = np.where(freq_logspaced <= config.f_weight)[0][-1]
    elif config.weighting == 'inv_frequency':
        weight_idxs = np.where(weight >= 0.7)[0]
        try:
            idx0 = weight_idxs[0]
        except IndexError:
            idx0 = 0
        try:
            idx1 = weight_idxs[-1]
        except IndexError:
            idx1 = len(weight) - 1
    else:
        idx0 = 0
        idx1 = len(weight) // 2
    return idx0, idx1


def _spec_inversion(config, spec, spec_weight):
    """Invert one spectrum, return a StationParameters() object."""
    # azimuth computation
    coords = spec.stats.coords
    stla = coords.latitude
    stlo = coords.longitude
    hypo = spec.stats.event.hypocenter
    magnitude = spec.stats.event.magnitude
    evla = hypo.latitude.value_in_deg
    evlo = hypo.longitude.value_in_deg
    geod = gps2dist_azimuth(evla, evlo, stla, stlo)
    az = geod[1]

    freq_logspaced = spec.freq_logspaced
    ydata = spec.data_mag_logspaced
    statId = f'{spec.id} {spec.stats.instrtype}'
    weight = spec_weight.data_logspaced

    # 'curve_fit' interprets 'yerr' as standard deviation array
    # and calculates weights as 1/yerr^2 .
    # Therefore we build yerr as:
    yerr = 1. / np.sqrt(weight)

    # Find frequency range (indexes) to compute Mw_0 and t_star_0
    # When using noise weighting, idx1 is the first maximum in
    # signal-to-noise ratio
    idx0, idx1 = _freq_ranges_for_Mw0_and_tstar0(
        config, weight, freq_logspaced, statId)

    # first maximum is a proxy for fc, we use it for fc_0:
    fc_0 = freq_logspaced[idx1]

    t_star_min = t_star_max = None
    if config.invert_t_star_0:
        # fit t_star_0 and Mw on the initial part of the spectrum,
        # corrected for the effect of fc
        ydata_corr =\
            ydata - spectral_model(freq_logspaced, Mw=0, fc=fc_0, t_star=0)
        ydata_corr = smooth(ydata_corr, window_len=18)
        slope, Mw_0 = np.polyfit(
            freq_logspaced[idx0: idx1], ydata_corr[idx0: idx1], deg=1)
        t_star_0 = -3. / 2 * slope / (np.pi * np.log10(np.e))
        t_star_min = t_star_0 * (1 - config.t_star_0_variability)
        t_star_max = t_star_0 * (1 + config.t_star_0_variability)
    if not config.invert_t_star_0 or t_star_0 < 0:
        # we calculate the initial value for Mw as an average
        Mw_0 = np.nanmean(ydata[idx0: idx1])
        t_star_0 = config.t_star_0
    # Mw_0_min and Mw_0_max are used to set the bounds for Mw
    # (see below)
    Mw_0_min = np.nanmin(ydata[idx0: idx1])
    Mw_0_max = np.nanmax(ydata[idx0: idx1])

    if config.Mw_0_from_event_file and magnitude.value is not None:
        Mw_0 = magnitude.value
        # If Mw_0 is provided in the event file, we will set the inversion
        # bounds around it (see below)
        Mw_0_min = Mw_0_max = Mw_0

    initial_values = InitialValues(Mw_0, fc_0, t_star_0)
    bounds = Bounds(config, spec, initial_values)
    Mw_0_variability =\
        config.Mw_0_variability if config.Mw_0_variability > 0 else 1e-6
    bounds.Mw_min = Mw_0_min * (1 - Mw_0_variability)
    bounds.Mw_max = Mw_0_max * (1 + Mw_0_variability)
    if t_star_min is not None:
        bounds.t_star_min = t_star_min
    if t_star_max is not None:
        bounds.t_star_max = t_star_max
    # Initial values need to be printed here because Bounds can modify them
    logger.info(f'{statId}: initial values: {initial_values}')
    logger.info(f'{statId}: bounds: {bounds}')
    try:
        params_opt, params_err, misfit = _curve_fit(
            config, spec, weight, yerr, initial_values, bounds)
    except (RuntimeError, ValueError) as m:
        raise RuntimeError(
            f'{m}\n{statId}: unable to fit spectral model'
        ) from m

    Mw, fc, t_star = params_opt
    Mw_err, fc_err, t_star_err = params_err
    inverted_par_str = f'Mw: {Mw:.4f}; fc: {fc:.4f}; t_star: {t_star:.4f}'
    logger.info(f'{statId}: optimal values: {inverted_par_str}')
    logger.info(f'{statId}: misfit: {misfit:.3f}')

    if np.isclose(fc, bounds.fc_min, rtol=0.1):
        raise ValueError(
            f'{statId}: optimal fc within 10% of fc_min: '
            f'{fc:.3f} ~= {bounds.fc_min:.3f}: ignoring inversion results'
        )

    if np.isclose(fc, bounds.fc_max, rtol=1e-4):
        raise ValueError(
            f'{statId}: optimal fc within 0.1% of fc_max: '
            f'{fc:.3f} ~= {bounds.fc_max:.3f}: ignoring inversion results'
        )

    misfit_max = config.pi_misfit_max or np.inf
    if misfit > misfit_max:
        raise ValueError(
            f'{statId}: misfit larger than pi_misfit_max: '
            f'{misfit:.3f} > {misfit_max:.3f}: ignoring inversion results'
        )

    # Check post-inversion bounds for t_star and fc
    pi_t_star_min, pi_t_star_max =\
        config.pi_t_star_min_max or (-np.inf, np.inf)
    # pylint: disable=superfluous-parens
    if not (pi_t_star_min <= t_star <= pi_t_star_max):
        raise ValueError(
            f'{statId}: t_star: {t_star:.3f} not in allowed range '
            f'[{pi_t_star_min:.3f}, {pi_t_star_max:.3f}]: '
            'ignoring inversion results'
        )
    pi_fc_min, pi_fc_max = config.pi_fc_min_max or (-np.inf, np.inf)
    if not (pi_fc_min <= fc <= pi_fc_max):
        raise ValueError(
            f'{statId}: fc: {fc:.3f} not in allowed range '
            f'[{pi_fc_min:.3f}, {pi_fc_max:.3f}]: ignoring inversion results'
        )

    station_pars = StationParameters(
        station_id=spec.id, instrument_type=spec.stats.instrtype,
        latitude=stla, longitude=stlo,
        hypo_dist_in_km=spec.stats.hypo_dist,
        epi_dist_in_km=spec.stats.epi_dist,
        azimuth=az)
    station_pars.Mw = SpectralParameter(
        param_id='Mw', value=Mw,
        lower_uncertainty=Mw_err[0], upper_uncertainty=Mw_err[1],
        confidence_level=68.2, format_spec='{:.2f}')
    station_pars.fc = SpectralParameter(
        param_id='fc', value=fc,
        lower_uncertainty=fc_err[0], upper_uncertainty=fc_err[1],
        confidence_level=68.2, format_spec='{:.3f}')
    station_pars.t_star = SpectralParameter(
        param_id='t_star', value=t_star,
        lower_uncertainty=t_star_err[0], upper_uncertainty=t_star_err[1],
        confidence_level=68.2, format_spec='{:.3f}')

    # additional parameters, computed from fc, Mw and t_star
    k_coeff = config.kp if config.wave_type == 'P' else config.ks
    beta = config.event.hypocenter.vs * 1e3  # shear wave velocity in m/s
    travel_time = spec.stats.travel_times[config.wave_type[0]]
    # seismic moment
    station_pars.Mo = SpectralParameter(
        param_id='Mo', value=mag_to_moment(Mw), format_spec='{:.3e}')
    # source radius in meters
    station_pars.radius = SpectralParameter(
        param_id='radius', value=source_radius(fc, beta, k_coeff),
        format_spec='{:.3f}')
    # Static stress drop in MPa
    station_pars.ssd = SpectralParameter(
        param_id='ssd',
        value=static_stress_drop(
            station_pars.Mo.value, station_pars.radius.value
        ),
        format_spec='{:.3e}'
    )
    # quality factor
    station_pars.Qo = SpectralParameter(
        param_id='Qo', value=quality_factor(travel_time, t_star),
        format_spec='{:.1f}')

    # Check post-inversion bounds for ssd
    pi_ssd_min, pi_ssd_max = config.pi_ssd_min_max or (-np.inf, np.inf)
    if not (pi_ssd_min <= station_pars.ssd.value <= pi_ssd_max):
        raise ValueError(
            f'{statId}: ssd: {station_pars.ssd.value:.3e} '
            f'not in allowed range [{pi_ssd_min:.3e}, {pi_ssd_max:.3e}]: '
            'ignoring inversion results'
        )

    # additional parameter errors, computed from fc, Mw and t_star
    # seismic moment
    Mw_min = Mw - Mw_err[0]
    Mw_max = Mw + Mw_err[1]
    Mo_min = mag_to_moment(Mw_min)
    Mo_max = mag_to_moment(Mw_max)
    station_pars.Mo.lower_uncertainty = station_pars.Mo.value - Mo_min
    station_pars.Mo.upper_uncertainty = Mo_max - station_pars.Mo.value
    station_pars.Mo.confidence_level = 68.2
    # source radius in meters
    fc_min = fc - fc_err[0]
    if fc_min <= 0:
        fc_min = freq_logspaced[0]
    fc_max = fc + fc_err[1]
    radius_min = source_radius(fc_max, beta, k_coeff)
    radius_max = source_radius(fc_min, beta, k_coeff)
    station_pars.radius.lower_uncertainty =\
        station_pars.radius.value - radius_min
    station_pars.radius.upper_uncertainty =\
        radius_max - station_pars.radius.value
    station_pars.radius.confidence_level = 68.2
    # static stress drop in MPa
    ssd_min = static_stress_drop(Mo_min, radius_max)
    ssd_max = static_stress_drop(Mo_max, radius_min)
    station_pars.ssd.lower_uncertainty = station_pars.ssd.value - ssd_min
    station_pars.ssd.upper_uncertainty = ssd_max - station_pars.ssd.value
    station_pars.ssd.confidence_level = 68.2
    # quality factor
    t_star_min = t_star - t_star_err[0]
    if t_star_min <= 0:
        t_star_min = 0.001
    t_star_max = t_star + t_star_err[1]
    Qo_min = quality_factor(travel_time, t_star_max)
    Qo_max = quality_factor(travel_time, t_star_min)
    station_pars.Qo.lower_uncertainty = station_pars.Qo.value - Qo_min
    station_pars.Qo.upper_uncertainty = Qo_max - station_pars.Qo.value
    station_pars.Qo.confidence_level = 68.2

    return station_pars


def _synth_spec(config, spec, station_pars):
    """Return a stream with one or more synthetic spectra."""
    par = {
        x.param_id: x.value
        for x in station_pars.get_spectral_parameters().values()
    }
    par_err = {
        x.param_id: x.compact_uncertainty()
        for x in station_pars.get_spectral_parameters().values()
    }
    params_opt = [par[key] for key in ('Mw', 'fc', 't_star')]
    chan_no_orientation = spec.stats.channel[:-1]
    spec_st = SpectrumStream()

    freq = spec.freq
    freq_logspaced = spec.freq_logspaced
    spec_synth = spec.copy()
    spec_synth.stats.channel = f'{chan_no_orientation}S'
    spec_synth.stats.par = par
    spec_synth.stats.par_err = par_err
    spec_synth.data_mag = spectral_model(freq, *params_opt)
    spec_synth.data = mag_to_moment(spec_synth.data_mag)
    spec_synth.data_mag_logspaced = spectral_model(freq_logspaced, *params_opt)
    spec_synth.data_logspaced = mag_to_moment(spec_synth.data_mag_logspaced)
    spec_st.append(spec_synth)

    # Add an extra spectrum with no attenuation
    if config.plot_spectra_no_attenuation:
        spec_synth = spec.copy()
        spec_synth.stats.channel = f'{chan_no_orientation}s'
        _params = list(params_opt)
        _params[-1] = 0
        spec_synth.data_mag = spectral_model(freq, *_params)
        spec_synth.data = mag_to_moment(spec_synth.data_mag)
        spec_synth.data_mag_logspaced =\
            spectral_model(freq_logspaced, *_params)
        spec_synth.data_logspaced =\
            mag_to_moment(spec_synth.data_mag_logspaced)
        spec_st.append(spec_synth)

    # Add an extra spectrum with no corner frequency
    if config.plot_spectra_no_fc:
        spec_synth = spec.copy()
        spec_synth.stats.channel = f'{chan_no_orientation}t'
        _params = list(params_opt)
        _params[1] = 1e999
        spec_synth.data_mag = spectral_model(freq, *_params)
        spec_synth.data = mag_to_moment(spec_synth.data_mag)
        spec_synth.data_mag_logspaced =\
            spectral_model(freq_logspaced, *_params)
        spec_synth.data_logspaced =\
            mag_to_moment(spec_synth.data_mag_logspaced)
        spec_st.append(spec_synth)
    return spec_st


[docs] def spectral_inversion(config, spec_st, weight_st): """Inversion of displacement spectra.""" logger.info('Inverting spectra...') weighting_messages = { 'noise': 'Using noise weighting for inversion.', 'frequency': 'Using frequency weighting for inversion.', 'inv_frequency': 'Using inverse frequency weighting for inversion.', 'no_weight': 'Using no weighting for inversion.' } logger.info(weighting_messages[config.weighting]) algorithm_messages = { 'TNC': 'Using truncated Newton algorithm for inversion.', 'LM': 'Using Levenberg-Marquardt algorithm for inversion.', 'BH': 'Using basin-hopping algorithm for inversion.', 'GS': 'Using grid search for inversion.', 'IS': 'Using k-d tree importance sampling for inversion.' } logger.info(algorithm_messages[config.inv_algorithm]) stations = {x.stats.station for x in spec_st} spectra = [sp for sta in stations for sp in spec_st.select(station=sta)] sspec_output = SourceSpecOutput() sspec_output.inversion_info.wave_type = config.wave_type sspec_output.inversion_info.algorithm = config.inv_algorithm sspec_output.inversion_info.weighting = config.weighting sspec_output.inversion_info.t_star_0 = config.t_star_0 sspec_output.inversion_info.invert_t_star_0 = config.invert_t_star_0 sspec_output.inversion_info.t_star_0_variability =\ config.t_star_0_variability sspec_output.inversion_info.t_star_min_max =\ config.t_star_min_max or 'null' sspec_output.inversion_info.fc_min_max = config.fc_min_max or 'null' sspec_output.inversion_info.Qo_min_max = config.Qo_min_max or 'null' event = config.event sspec_output.event_info.event_id = event.event_id if event.name is not None: sspec_output.event_info.event_name = event.name sspec_output.event_info.longitude = event.hypocenter.longitude.value_in_deg sspec_output.event_info.latitude = event.hypocenter.latitude.value_in_deg sspec_output.event_info.depth_in_km = event.hypocenter.depth.value_in_km sspec_output.event_info.origin_time = event.hypocenter.origin_time sspec_output.event_info.vp_in_km_s = event.hypocenter.vp sspec_output.event_info.vs_in_km_s = event.hypocenter.vs sspec_output.event_info.rho_in_kg_m3 = event.hypocenter.rho sspec_output.event_info.kp = config.kp sspec_output.event_info.ks = config.ks if config.Mw_0_from_event_file and event.magnitude.value is not None: msg = ( f'Setting Mw_0 to the value provided in the event file: ' f'{event.magnitude.mag_type} ' f'{event.magnitude.value:.4f}') if event.magnitude.computed: msg += ' (computed from scalar moment)' logger.info(msg) for spec in sorted(spectra, key=lambda sp: sp.id): if spec.stats.channel[-1] != 'H': continue if spec.stats.ignore: continue spec_weight = select_trace(weight_st, spec.id, spec.stats.instrtype) try: station_pars = _spec_inversion(config, spec, spec_weight) except (RuntimeError, ValueError) as msg: logger.warning(msg) continue spec_st += _synth_spec(config, spec, station_pars) sspec_output.station_parameters[station_pars.station_id] = station_pars logger.info('Inverting spectra: done') logger.info('---------------------------------------------------') return sspec_output