# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Utility functions for sourcespec.
:copyright:
2012-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import os
from glob import glob
import logging
import math
import numpy as np
from obspy.signal.invsim import cosine_taper as _cos_taper
from obspy.geodetics import gps2dist_azimuth, kilometers2degrees
from obspy.taup import TauPyModel
model = TauPyModel(model='iasp91')
v_model = model.model.s_mod.v_mod
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
# STATISTICS ------------------------------------------------------------------
[docs]
def weighted_std(values, weights):
"""
Return the weighted standard deviation.
:param values: array of values
:param weights: array of weights
:return: weighted standard deviation
"""
nan_values = np.isnan(values)
nan_weights = np.isnan(weights)
values = values[~(nan_values | nan_weights)]
weights = weights[~(nan_values | nan_weights)]
if len(values) == 0:
return 0
average = np.average(values, weights=weights)
variance = np.average((values - average)**2, weights=weights)
return np.sqrt(variance)
# -----------------------------------------------------------------------------
# MISC ------------------------------------------------------------------------
[docs]
def spec_minmax(amp, freq, amp_minmax=None, freq_minmax=None):
"""Get minimum and maximum values of spectral amplitude and frequency."""
# Make use of numpy's nanmin and nanmax functions to ignore NaNs in the
# array. Note that amp_min and amp_max can still be NaN if all the array
# values are NaN.
if np.all(np.isnan(amp)):
amp_min = amp_max = np.nan
else:
amp_min = np.nanmin(amp)
amp_max = np.nanmax(amp)
if amp_minmax is None:
amp_minmax = [amp_min, amp_max]
else:
if not np.isnan(amp_min) and amp_min < amp_minmax[0]:
amp_minmax[0] = amp_min
if not np.isnan(amp_max) and amp_max > amp_minmax[1]:
amp_minmax[1] = amp_max
# Same for frequency
freq_min = np.nanmin(freq)
freq_max = np.nanmax(freq)
if freq_minmax is None:
freq_minmax = [freq_min, freq_max]
else:
if not np.isnan(freq_min) and freq_min < freq_minmax[0]:
freq_minmax[0] = freq_min
if not np.isnan(freq_max) and freq_max > freq_minmax[1]:
freq_minmax[1] = freq_max
return amp_minmax, freq_minmax
[docs]
def select_trace(stream, traceid, instrtype):
"""Select trace from stream using traceid and instrument type."""
return [tr for tr in stream.select(id=traceid)
if tr.stats.instrtype == instrtype][0]
# -----------------------------------------------------------------------------
# MEDIUM PROPERTIES -----------------------------------------------------------
[docs]
class MediumProperties():
"""
Class to retrieve medium properties from config.
:param lon: Longitude (degrees).
:type lon: float
:param lat: Latitude (degrees).
:type lat: float
:param depth_in_km: Depth (km).
:type depth_in_km: float
:param config: Configuration object.
:type config: :class:`~sourcespec.config.Config`
"""
_logged_messages = set() # Class variable shared across all instances
def __init__(self, lon, lat, depth_in_km, config):
self.lon = lon
self.lat = lat
self.depth_in_km = depth_in_km
self.config = config
def _log_once(self, message):
"""
Log a message only once, even if called multiple times.
:param message: message to log.
:type message: str
"""
if message not in self._logged_messages:
logger.info(message)
self._logged_messages.add(message)
def _select_value_at_depth(self, values, depths):
"""
Select value at source depth from layered model.
:param values: Array of property values.
:type values: list or array
:param depths: Array of layer depths.
:type depths: list or array
:return: Value at source depth.
:rtype: float
"""
values = np.array(values)
depths = np.array(depths)
try:
# find the last value that is smaller than the source depth
value = values[depths <= self.depth_in_km][-1]
except IndexError:
value = values[0]
return value
def _get_layered_property_value(self, values, depths):
"""
Return a property value considering layering and source depth.
:param values: List of property values (may contain a single element).
:type values: list or numpy array
:param depths: Corresponding layer top depths or None.
:type depths: list, numpy array or None
:return: Value at the current depth, or None if no values provided.
:rtype: float or None
"""
if values is None:
return None
values = np.array(values)
if values.size == 0:
return None
if depths is None:
return values[0]
return self._select_value_at_depth(values, depths)
def _get_general_property_value(self, mproperty, where):
"""
Retrieve general earth-model property and log once if used.
:param mproperty: Property name (vp, vs, rho).
:type mproperty: str
:param where: Text label for logging (source/stations).
:type where: str
:return: Property value or None.
:rtype: float or None
"""
general_values = getattr(self.config, mproperty)
general_depths = getattr(self.config, 'layer_top_depths', None)
value = self._get_layered_property_value(
general_values, general_depths)
if value is not None:
self._log_once(
f'Taking {mproperty} at the {where} '
'from general earth model parameters')
return value
[docs]
def get_from_config_param_source(self, mproperty):
"""
Get medium property at the source from config parameter.
:param mproperty: Property to be retrieved
(``'vp'``, ``'vs'`` or ``'rho'``).
:type mproperty: str
:return: Property value.
:rtype: float
"""
if mproperty not in ('vp', 'vs', 'rho'):
raise ValueError(f'Invalid property: {mproperty}')
# 1) Try source-specific parameters (with optional source-specific
# layering depths).
source_values = self.config[f'{mproperty}_source']
source_depths = getattr(self.config, 'layer_top_depths_source', None)
value = self._get_layered_property_value(source_values, source_depths)
if value is not None:
return value
# 2) Fallback to general parameters (with general layering depths).
return self._get_general_property_value(mproperty, 'source')
[docs]
def get_from_config_param_station(self, mproperty):
"""
Get medium property at the station from config parameter.
:param mproperty: Property to be retrieved
(``'vp'``, ``'vs'`` or ``'rho'``).
:type mproperty: str
:return: Property value.
:rtype: float
"""
if mproperty not in ('vp', 'vs', 'rho'):
raise ValueError(f'Invalid property: {mproperty}')
# 1) Try station-specific scalar parameter (no layering at stations).
value = self.config[f'{mproperty}_stations']
if value is not None:
return value
# 2) Fallback to general parameters (layered, depth-dependent).
return self._get_general_property_value(mproperty, 'stations')
[docs]
def get_vel_from_NLL(self, wave):
"""
Get velocity from NLL model.
:param wave: Wave type (``'P'`` or ``'S'``).
:type wave: str
:return: Velocity (km/s).
:rtype: float
"""
# Lazy-import here, since nllgrid is not an installation requirement
# pylint: disable=import-outside-toplevel
from nllgrid import NLLGrid
grdfile = f'*.{wave}.mod.hdr'
grdfile = os.path.join(self.config.NLL_model_dir, grdfile)
try:
grdfile = glob(grdfile)[0]
except IndexError as e:
raise FileNotFoundError(
f'Unable to find model file {grdfile}') from e
grd = NLLGrid(grdfile)
x, y = grd.project(self.lon, self.lat)
value = grd.get_value(x, y, self.depth_in_km)
if grd.type == 'VELOCITY':
vel = value
elif grd.type == 'VELOCITY_METERS':
vel = value / 1e3
elif grd.type == 'SLOWNESS':
vel = 1. / value
elif grd.type == 'SLOW_LEN':
vel = grd.dx / value
elif grd.type == 'VEL2':
vel = value**0.5
elif grd.type == 'SLOW2':
vel = 1. / (value**0.5)
elif grd.type == 'SLOW2_METERS':
vel = (1. / (value**0.5)) / 1e3
else:
raise ValueError(f'Unsupported grid type: {grd.type}')
self._log_once(f'Using {wave} velocity from NLL model')
return vel
[docs]
def get_from_taup(self, mproperty):
"""
Get medium property (P- or S-wave velocity, density) at a given depth
from taup model.
:param mproperty: Property to be retrieved
(``'vp'``, ``'vs'`` or ``'rho'``).
:type mproperty: str
:return: Property value.
:rtype: float
"""
# avoid negative depths
depth_in_km = max(self.depth_in_km, 1e-2)
try:
prop = {'vp': 'p', 'vs': 's', 'rho': 'r'}[mproperty]
except KeyError as e:
raise ValueError(f'Invalid property: {mproperty}') from e
value = v_model.evaluate_above(depth_in_km, prop)[0]
if mproperty == 'rho':
value *= 1e3 # convert g/cm**3 to kg/m**3
return value
[docs]
def get(self, mproperty, where):
"""
Get medium property (P- or S-wave velocity, density) at a given point
from NLL grid, config or taup model.
:param mproperty: Property to be retrieved
(``'vp'``, ``'vs'`` or ``'rho'``).
:type mproperty: str
:param where: Where to retrieve medium property
(``'source'`` or ``'stations'``).
:type where: str
:return: Property value.
:rtype: float
"""
if mproperty not in ['vp', 'vs', 'rho']:
raise ValueError(f'Invalid property: {mproperty}')
if where == 'source':
value = self.get_from_config_param_source(mproperty)
elif where == 'stations':
value = self.get_from_config_param_station(mproperty)
else:
raise ValueError(f'Invalid location: {where}')
if (
self.config.NLL_model_dir is not None and
mproperty in ['vp', 'vs']
):
wave = 'P' if mproperty == 'vp' else 'S'
try:
value = self.get_vel_from_NLL(wave)
except Exception as msg:
logger.warning(msg)
logger.warning(f'Using {wave} velocity from config')
if value is None:
value = self.get_from_taup(mproperty)
self._log_once(
f'Taking {mproperty} at the {where} '
'from global model (iasp91)')
return float(value)
[docs]
def to_string(self, mproperty, value):
"""
Return a string with the property name and value.
:param mproperty: Property name. Must contain one of the following:
``'vp'``, ``'vs'``, ``'rho'``, ``'depth'``
:type mproperty: str
:param value: Property value.
:type value: float
:return: Property string.
:rtype: str
"""
if 'vp' in mproperty or 'vs' in mproperty:
value = round(value, 2)
unit = 'km/s'
elif 'rho' in mproperty:
value = round(value, 1)
unit = 'kg/m3'
elif 'depth' in mproperty:
value = round(value, 1)
unit = 'km'
else:
raise ValueError(f'Invalid property: {mproperty}')
# we need to use float(value) to be sure that is_integer() is defined
if float(value).is_integer():
value = int(value)
return f'{mproperty}: {value} {unit}'
# -----------------------------------------------------------------------------
# SOURCE PARAMETERS -----------------------------------------------------------
[docs]
def moment_to_mag(moment):
"""Convert moment to magnitude."""
return (np.log10(moment) - 9.1) / 1.5
[docs]
def mag_to_moment(magnitude):
"""Convert magnitude to moment."""
return np.power(10, (1.5 * magnitude + 9.1))
[docs]
def source_radius(fc_in_hz, vs_in_m_per_s, k_coeff=0.3724):
"""
Compute source radius in meters.
Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 31
Kaneko and Shearer (2014), doi:10.1093/gji/ggu030, eq. 2, 15, 16
"""
return k_coeff * vs_in_m_per_s / fc_in_hz
[docs]
def static_stress_drop(Mo_in_N_m, ra_in_m):
"""
Compute static stress drop in MPa.
Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 27
"""
return 7. / 16 * Mo_in_N_m / ra_in_m**3 * 1e-6
[docs]
def quality_factor(travel_time_in_s, t_star_in_s):
"""Compute quality factor from travel time and t_star."""
return np.inf if t_star_in_s == 0 else travel_time_in_s / t_star_in_s
# -----------------------------------------------------------------------------
# SIGNAL ANALYSIS -------------------------------------------------------------
[docs]
def cosine_taper(signal, width, left_taper=False):
"""Apply a cosine taper to the signal."""
# TODO: this taper looks more like a hanning...
npts = len(signal)
p = 2 * width
tap = _cos_taper(npts, p)
if left_taper:
tap[npts // 2:] = 1.
signal *= tap
# modified from: http://stackoverflow.com/q/5515720
[docs]
def smooth(signal, window_len=11, window='hanning'):
"""Smooth the signal using a window with requested size."""
if signal.ndim != 1:
raise ValueError('smooth only accepts 1 dimension arrays.')
if signal.size < window_len:
raise ValueError('Input vector needs to be bigger than window size.')
if window_len < 3:
return signal
if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError("Window is one of 'flat', 'hanning', 'hamming', "
"'bartlett', 'blackman'")
s = np.r_[
2 * signal[0] - signal[window_len - 1::-1],
signal,
2 * signal[-1] - signal[-1:-window_len:-1]
]
if window == 'flat': # moving average
w = np.ones(window_len, 'd')
else:
w = eval(f'np.{window}(window_len)') # pylint: disable=eval-used
y = np.convolve(w / w.sum(), s, mode='same')
return y[window_len:-window_len + 1]
[docs]
def remove_instr_response(trace, pre_filt=(0.5, 0.6, 40., 45.)):
"""
Remove instrument response from a trace.
Trace is converted to the sensor units (m for a displacement sensor,
m/s for a short period or broadband velocity sensor, m/s**2 for a strong
motion sensor).
:param trace: Trace to be corrected.
:type trace: :class:`~obspy.core.trace.Trace`
:param pre_filt: Pre-filter frequencies (``None`` means no pre-filtering).
:type pre_filt: tuple of four floats
"""
trace_info = trace.stats.info
inventory = trace.stats.inventory
if not inventory:
# empty inventory
raise RuntimeError(f'{trace_info}: no instrument response for trace')
# remove the mean...
trace.detrend(type='constant')
# ...and the linear trend
trace.detrend(type='linear')
# Define output units based on nominal units in inventory
# Note: ObsPy >= 1.3.0 supports the 'DEF' output unit, which will make
# this step unnecessary
if trace.stats.units.lower() == 'm':
output = 'DISP'
if trace.stats.units.lower() == 'm/s':
output = 'VEL'
if trace.stats.units.lower() == 'm/s**2':
output = 'ACC'
# Finally remove instrument response,
# trace is converted to the sensor units
trace.remove_response(
inventory=inventory, output=output, pre_filt=pre_filt)
if any(np.isnan(trace.data)):
raise RuntimeError(
f'{trace_info}: NaN values in trace after '
'instrument response removal')
# -----------------------------------------------------------------------------
# GEODETICS AND COORDINATES ---------------------------------------------------
[docs]
def toRad(degrees):
"""Convert degrees to radians."""
return math.pi * degrees / 180
[docs]
def toDeg(radians):
"""Convert radians to degrees."""
return 180 * radians / math.pi
def _validate_coord(coordinate, coordinate_name):
try:
coordinate = float(coordinate)
except Exception as e:
raise ValueError(f'Invalid {coordinate_name}: {coordinate}') from e
return coordinate
[docs]
def station_to_event_position(trace):
"""
Compute station position with respect to the event, in terms of hypocentral
distance (km), epicentral distance (km), great-circle distance (degrees),
azimuth and back-azimuth.
Values are stored in the trace stats dictionary.
"""
coords = trace.stats.coords
stla = _validate_coord(coords.latitude, 'station latitude')
stlo = _validate_coord(coords.longitude, 'station longitude')
stel = _validate_coord(coords.elevation, 'station elevation')
hypo = trace.stats.event.hypocenter
evla = _validate_coord(hypo.latitude.value_in_deg, 'event latitude')
evlo = _validate_coord(hypo.longitude.value_in_deg, 'event longitude')
evdp = _validate_coord(hypo.depth.value_in_km, 'event depth')
epi_dist, az, baz = gps2dist_azimuth(evla, evlo, stla, stlo)
epi_dist /= 1e3 # in km
gcarc = kilometers2degrees(epi_dist)
hypo_dist = math.sqrt(epi_dist**2 + (stel + evdp)**2)
trace.stats.azimuth = az
trace.stats.back_azimuth = baz
trace.stats.epi_dist = epi_dist
trace.stats.hypo_dist = hypo_dist
trace.stats.gcarc = gcarc
def _compute_azimuthal_gaps(azimuth_array):
"""
Compute azimuthal gaps from an array of azimuths.
:param azimuth_array: Array of azimuths (degrees).
:type azimuth_array: list of float or numpy array
:return: azimuthal gap (degrees).
:rtype: float
"""
if len(azimuth_array) == 0:
return np.nan
if len(azimuth_array) == 1:
return 360.
azimuth_array = np.sort(np.array(azimuth_array) % 360)
azimuth_array = np.append(azimuth_array, azimuth_array[0] + 360)
gaps = np.diff(azimuth_array)
return np.max(gaps)
[docs]
def primary_and_secondary_azimuthal_gap(azimuth_array):
"""
Compute primary and secondary azimuthal gap from an array of azimuths.
:param azimuth_array: Array of azimuths (degrees).
:type azimuth_array: list of float or numpy array
:return: Primary and secondary gap (degrees).
:rtype: tuple of two floats
"""
if len(azimuth_array) == 0:
return np.nan, np.nan
if len(azimuth_array) == 1:
return 360., 360.
azimuth_array = np.array(azimuth_array)
primary_gap = _compute_azimuthal_gaps(azimuth_array)
# Secondary gap: recompute gaps after removing each azimuth once,
# then take the maximum of those values.
secondary_gaps = []
for i in range(len(azimuth_array)):
az_copy = np.delete(azimuth_array, i)
secondary_gaps.append(_compute_azimuthal_gaps(az_copy))
secondary_gap = np.max(secondary_gaps)
return primary_gap, secondary_gap
# -----------------------------------------------------------------------------