Source code for ssp_util

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Utility functions for sourcespec.

:copyright:
    2012-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import os
from glob import glob
import logging
import math
import numpy as np
from obspy.signal.invsim import cosine_taper as _cos_taper
from obspy.geodetics import gps2dist_azimuth, kilometers2degrees
from obspy.taup import TauPyModel
model = TauPyModel(model='iasp91')
v_model = model.model.s_mod.v_mod
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])


# MISC ------------------------------------------------------------------------
[docs] def spec_minmax(amp, freq, amp_minmax=None, freq_minmax=None): """Get minimum and maximum values of spectral amplitude and frequency.""" amp_min = amp.min() amp_max = amp.max() if amp_minmax is None: amp_minmax = [amp_min, amp_max] else: if amp_min < amp_minmax[0]: amp_minmax[0] = amp_min if amp_max > amp_minmax[1]: amp_minmax[1] = amp_max freq_min = freq.min() freq_max = freq.max() if freq_minmax is None: freq_minmax = [freq_min, freq_max] else: if freq_min < freq_minmax[0]: freq_minmax[0] = freq_min if freq_max > freq_minmax[1]: freq_minmax[1] = freq_max return amp_minmax, freq_minmax
[docs] def select_trace(stream, traceid, instrtype): """Select trace from stream using traceid and instrument type.""" return [tr for tr in stream.select(id=traceid) if tr.stats.instrtype == instrtype][0]
# ----------------------------------------------------------------------------- # MEDIUM PROPERTIES -----------------------------------------------------------
[docs] class MediumProperties(): """ Class to retrieve medium properties from config. :param lon: Longitude (degrees). :type lon: float :param lat: Latitude (degrees). :type lat: float :param depth_in_km: Depth (km). :type depth_in_km: float :param config: Configuration object. :type config: :class:`~sourcespec.config.Config` """ def __init__(self, lon, lat, depth_in_km, config): self.lon = lon self.lat = lat self.depth_in_km = depth_in_km self.config = config
[docs] def get_from_config_param_source(self, mproperty): """ Get medium property at the source from config parameter. :param mproperty: Property to be retrieved (``'vp'``, ``'vs'`` or ``'rho'``). :type mproperty: str :return: Property value. :rtype: float """ if mproperty not in ['vp', 'vs', 'rho']: raise ValueError(f'Invalid property: {mproperty}') values = self.config[f'{mproperty}_source'] if values is None: return None if self.config.layer_top_depths is None: return values[0] values = np.array(values) depths = np.array(self.config.layer_top_depths) try: # find the last value that is smaller than the source depth value = values[depths <= self.depth_in_km][-1] except IndexError: value = values[0] return value
[docs] def get_from_config_param_station(self, mproperty): """ Get medium property at the station from config parameter. :param mproperty: Property to be retrieved (``'vp'``, ``'vs'`` or ``'rho'``). :type mproperty: str :return: Property value. :rtype: float """ if mproperty not in ['vp', 'vs', 'rho']: raise ValueError(f'Invalid property: {mproperty}') value = self.config[f'{mproperty}_stations'] if value is None: value = self.get_from_config_param_source( mproperty) return value
[docs] def get_vel_from_NLL(self, wave): """ Get velocity from NLL model. :param wave: Wave type (``'P'`` or ``'S'``). :type wave: str :return: Velocity (km/s). :rtype: float """ # Lazy-import here, since nllgrid is not an installation requirement # pylint: disable=import-outside-toplevel from nllgrid import NLLGrid grdfile = f'*.{wave}.mod.hdr' grdfile = os.path.join(self.config.NLL_model_dir, grdfile) try: grdfile = glob(grdfile)[0] except IndexError as e: raise FileNotFoundError( f'Unable to find model file {grdfile}') from e grd = NLLGrid(grdfile) x, y = grd.project(self.lon, self.lat) value = grd.get_value(x, y, self.depth_in_km) if grd.type == 'VELOCITY': vel = value elif grd.type == 'VELOCITY_METERS': vel = value / 1e3 elif grd.type == 'SLOWNESS': vel = 1. / value elif grd.type == 'SLOW_LEN': vel = grd.dx / value elif grd.type == 'VEL2': vel = value**0.5 elif grd.type == 'SLOW2': vel = 1. / (value**0.5) elif grd.type == 'SLOW2_METERS': vel = (1. / (value**0.5)) / 1e3 else: raise ValueError(f'Unsupported grid type: {grd.type}') logger.info(f'Using {wave} velocity from NLL model') return vel
[docs] def get_from_taup(self, mproperty): """ Get medium property (P- or S-wave velocity, density) at a given depth from taup model. :param mproperty: Property to be retrieved (``'vp'``, ``'vs'`` or ``'rho'``). :type mproperty: str :return: Property value. :rtype: float """ # avoid negative depths depth_in_km = max(self.depth_in_km, 1e-2) try: prop = {'vp': 'p', 'vs': 's', 'rho': 'r'}[mproperty] except KeyError as e: raise ValueError(f'Invalid property: {mproperty}') from e value = v_model.evaluate_above(depth_in_km, prop)[0] if mproperty == 'rho': value *= 1e3 # convert g/cm**3 to kg/m**3 return value
[docs] def get(self, mproperty, where): """ Get medium property (P- or S-wave velocity, density) at a given point from NLL grid, config or taup model. :param mproperty: Property to be retrieved (``'vp'``, ``'vs'`` or ``'rho'``). :type mproperty: str :param where: Where to retrieve medium property (``'source'`` or ``'stations'``). :type where: str :return: Property value. :rtype: float """ if mproperty not in ['vp', 'vs', 'rho']: raise ValueError(f'Invalid property: {mproperty}') if where == 'source': value = self.get_from_config_param_source( mproperty) elif where == 'stations': value = self.get_from_config_param_station( mproperty) else: raise ValueError(f'Invalid location: {where}') if ( self.config.NLL_model_dir is not None and mproperty in ['vp', 'vs'] ): wave = 'P' if mproperty == 'vp' else 'S' try: value = self.get_vel_from_NLL(wave) except Exception as msg: logger.warning(msg) logger.warning(f'Using {wave} velocity from config') if value is None: value = self.get_from_taup(mproperty) logger.info( f'Using {mproperty} from global model (iasp91)') return float(value)
[docs] def to_string(self, mproperty, value): """ Return a string with the property name and value. :param mproperty: Property name. Must contain one of the following: ``'vp'``, ``'vs'``, ``'rho'``, ``'depth'`` :type mproperty: str :param value: Property value. :type value: float :return: Property string. :rtype: str """ if 'vp' in mproperty or 'vs' in mproperty: value = round(value, 2) unit = 'km/s' elif 'rho' in mproperty: value = round(value, 1) unit = 'kg/m3' elif 'depth' in mproperty: value = round(value, 1) unit = 'km' else: raise ValueError(f'Invalid property: {mproperty}') # we need to use float(value) to be sure that is_integer() is defined if float(value).is_integer(): value = int(value) return f'{mproperty}: {value} {unit}'
# ----------------------------------------------------------------------------- # GEOMETRICAL SPREADING -------------------------------------------------------
[docs] def geom_spread_r_power_n(hypo_dist_in_km, exponent): """ rⁿ geometrical spreading coefficient. :param hypo_dist_in_km: Hypocentral distance (km). :type hypo_dist_in_km: float :param exponent: Exponent. :type exponent: float :return: Geometrical spreading correction (in m) :rtype: float """ dist = hypo_dist_in_km * 1e3 return dist**exponent
def _boatwright_above_cutoff_dist(freqs, cutoff_dist, dist): """ Geometrical spreading coefficient from Boatwright et al. (2002), eq. 8. Except that we take the square root of eq. 8, since we correct amplitude and not energy. This is the part of the equation that is valid for distances above the cutoff distance. :param freqs: Frequencies (Hz). :type freqs: numpy.ndarray :param cutoff_dist: Cutoff distance (m). :type cutoff_dist: float :param dist: Distance (m). :type dist: float :return: Geometrical spreading correction (in m) :rtype: numpy.ndarray """ exponent = np.ones_like(freqs) low_freq = freqs <= 0.2 mid_freq = np.logical_and(freqs > 0.2, freqs <= 0.25) high_freq = freqs >= 0.25 exponent[low_freq] = 0.5 exponent[mid_freq] = 0.5 + 2 * np.log10(5 * freqs[mid_freq]) exponent[high_freq] = 0.7 return cutoff_dist * (dist / cutoff_dist)**exponent
[docs] def geom_spread_boatwright(hypo_dist_in_km, cutoff_dist_in_km, freqs): """" Geometrical spreading coefficient from Boatwright et al. (2002), eq. 8. Except that we take the square root of eq. 8, since we correct amplitude and not energy. :param hypo_dist_in_km: Hypocentral distance (km). :type hypo_dist_in_km: float :param cutoff_dist_in_km: Cutoff distance (km). :type cutoff_dist_in_km: float :param freqs: Frequencies (Hz). :type freqs: numpy.ndarray :return: Geometrical spreading correction (in m) :rtype: numpy.ndarray """ dist = hypo_dist_in_km * 1e3 cutoff_dist = cutoff_dist_in_km * 1e3 if dist <= cutoff_dist: return dist return _boatwright_above_cutoff_dist(freqs, cutoff_dist, dist)
def _compute_dtdd(angular_distance, aperture, source_depth_in_km, phase_list): """ Compute the local derivative of takeoff angle with respect to angular distance. This is used to compute the geometrical spreading coefficient for teleseismic body waves, following Okal (1992), eq. 4. :param angular_distance: Angular distance (degrees). :type angular_distance: float :param aperture: Aperture of the ray tube (degrees). :type aperture: float :param source_depth_in_km: Source depth (km). :type source_depth_in_km: float :param phase_list: List of phases. :type phase_list: list of str :return: Local derivative of takeoff angle with respect to angular distance. :rtype: float """ distances = np.linspace( angular_distance - aperture, angular_distance + aperture, 3) # pylint: disable=no-member takeoff_angles = np.array([ model.get_travel_times( source_depth_in_km, d, phase_list)[0].takeoff_angle for d in distances]) return np.gradient(takeoff_angles, distances)[1]
[docs] def geom_spread_teleseismic( angular_distance, source_depth_in_km, station_depth_in_km, phase): """ Calculate geometrical spreading coefficient for teleseismic body waves. Implements eq (4) in Okal (1992) for a spherically symmetric Earth. This equations is derived from the conservation of the kinetic energy flux along a ray tube between the source and the receiver. :param angular_distance: Angular distance (degrees). :type angular_distance: float :param source_depth_in_km: Source depth (km). :type source_depth_in_km: float :param station_depth_in_km: Station depth (km). :type station_depth_in_km: float :param phase: Phase type (``'P'`` or ``'S'``). :type phase: str :return: Geometrical spreading correction (in m) :rtype: float """ # Don't need to specify coordinates, since we use a spherically symmetric # Earth; don't need to specify a config object, since we use the global # model (iasp91) medium_properties_source = MediumProperties( 0, 0, source_depth_in_km, None) medium_properties_station = MediumProperties( 0, 0, station_depth_in_km, None) if phase == 'P': v_source = medium_properties_source.get_from_taup('vp') v_station = medium_properties_station.get_from_taup('vp') phase_list = ['p', 'P', 'pP', 'sP'] elif phase == 'S': v_source = medium_properties_source.get_from_taup('vs') v_station = medium_properties_station.get_from_taup('vs') phase_list = ['s', 'S', 'sS', 'pS'] else: raise ValueError(f'Invalid phase: {phase}') rho_source = medium_properties_source.get_from_taup('rho') rho_station = medium_properties_station.get_from_taup('rho') delta = np.deg2rad(angular_distance) arrival = model.get_travel_times( source_depth_in_km, angular_distance, phase_list)[0] # pylint: disable=no-member takeoff_angle = np.deg2rad(arrival.takeoff_angle) incident_angle = np.deg2rad(arrival.incident_angle) # Cmpute the local derivative of the takeoff angle (dt) with respect to the # angular distance (dd, also called the aperture of the ray tube). # We use a finite difference approximation, and we increase the aperture # until we get a non-zero value. aperture = 0.1 # degrees for _ in range(10): dtdd = _compute_dtdd( angular_distance, aperture, source_depth_in_km, phase_list) if dtdd != 0: break aperture *= 2 if dtdd == 0: raise ValueError( f'Unable to compute geometrical spreading coefficient for ' f'{phase} wave at {angular_distance:.2f} degrees: dt/dd = 0') # we have now all the ingredients to calculate the spreading coefficient, # eq (4) in Okal, 1992 spreading_coeff = ( (rho_source * v_source) / (rho_station * v_station) * np.sin(takeoff_angle) / np.sin(delta) * 1 / np.cos(incident_angle) * np.abs(dtdd) )**0.5 earth_radius = 6371e3 # m # We return the inverse of Okal's coefficient, since we use it to correct # the amplitude return earth_radius / spreading_coeff
# ----------------------------------------------------------------------------- # SOURCE PARAMETERS -----------------------------------------------------------
[docs] def moment_to_mag(moment): """Convert moment to magnitude.""" return (np.log10(moment) - 9.1) / 1.5
[docs] def mag_to_moment(magnitude): """Convert magnitude to moment.""" return np.power(10, (1.5 * magnitude + 9.1))
[docs] def source_radius(fc_in_hz, vs_in_m_per_s, k_coeff=0.3724): """ Compute source radius in meters. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 31 Kaneko and Shearer (2014), doi:10.1093/gji/ggu030, eq. 2, 15, 16 """ return k_coeff * vs_in_m_per_s / fc_in_hz
[docs] def static_stress_drop(Mo_in_N_m, ra_in_m): """ Compute static stress drop in MPa. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 27 """ return 7. / 16 * Mo_in_N_m / ra_in_m**3 * 1e-6
[docs] def quality_factor(travel_time_in_s, t_star_in_s): """Compute quality factor from travel time and t_star.""" return np.inf if t_star_in_s == 0 else travel_time_in_s / t_star_in_s
# ----------------------------------------------------------------------------- # SIGNAL ANALYSIS -------------------------------------------------------------
[docs] def cosine_taper(signal, width, left_taper=False): """Apply a cosine taper to the signal.""" # TODO: this taper looks more like a hanning... npts = len(signal) p = 2 * width tap = _cos_taper(npts, p) if left_taper: tap[npts // 2:] = 1. signal *= tap
# modified from: http://stackoverflow.com/q/5515720
[docs] def smooth(signal, window_len=11, window='hanning'): """Smooth the signal using a window with requested size.""" if signal.ndim != 1: raise ValueError('smooth only accepts 1 dimension arrays.') if signal.size < window_len: raise ValueError('Input vector needs to be bigger than window size.') if window_len < 3: return signal if window not in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError("Window is one of 'flat', 'hanning', 'hamming', " "'bartlett', 'blackman'") s = np.r_[ 2 * signal[0] - signal[window_len - 1::-1], signal, 2 * signal[-1] - signal[-1:-window_len:-1] ] if window == 'flat': # moving average w = np.ones(window_len, 'd') else: w = eval(f'np.{window}(window_len)') # pylint: disable=eval-used y = np.convolve(w / w.sum(), s, mode='same') yy = y[window_len:-window_len + 1] # check if there are NaN values nanindexes = np.where(np.isnan(yy)) yy[nanindexes] = signal[nanindexes] return yy
[docs] def remove_instr_response(trace, pre_filt=(0.5, 0.6, 40., 45.)): """ Remove instrument response from a trace. Trace is converted to the sensor units (m for a displacement sensor, m/s for a short period or broadband velocity sensor, m/s**2 for a strong motion sensor). :param trace: Trace to be corrected. :type trace: :class:`~obspy.core.trace.Trace` :param pre_filt: Pre-filter frequencies (``None`` means no pre-filtering). :type pre_filt: tuple of four floats """ trace_info = trace.stats.info inventory = trace.stats.inventory if not inventory: # empty inventory raise RuntimeError(f'{trace_info}: no instrument response for trace') # remove the mean... trace.detrend(type='constant') # ...and the linear trend trace.detrend(type='linear') # Define output units based on nominal units in inventory # Note: ObsPy >= 1.3.0 supports the 'DEF' output unit, which will make # this step unnecessary if trace.stats.units.lower() == 'm': output = 'DISP' if trace.stats.units.lower() == 'm/s': output = 'VEL' if trace.stats.units.lower() == 'm/s**2': output = 'ACC' # Finally remove instrument response, # trace is converted to the sensor units trace.remove_response( inventory=inventory, output=output, pre_filt=pre_filt) if any(np.isnan(trace.data)): raise RuntimeError( f'{trace_info}: NaN values in trace after ' 'instrument response removal')
# ----------------------------------------------------------------------------- # GEODETICS AND COORDINATES ---------------------------------------------------
[docs] def toRad(degrees): """Convert degrees to radians.""" return math.pi * degrees / 180
[docs] def toDeg(radians): """Convert radians to degrees.""" return 180 * radians / math.pi
def _validate_coord(coordinate, coordinate_name): try: coordinate = float(coordinate) except Exception as e: raise ValueError(f'Invalid {coordinate_name}: {coordinate}') from e return coordinate
[docs] def station_to_event_position(trace): """ Compute station position with respect to the event, in terms of hypocentral distance (km), epicentral distance (km), great-circle distance (degrees), azimuth and back-azimuth. Values are stored in the trace stats dictionary. """ coords = trace.stats.coords stla = _validate_coord(coords.latitude, 'station latitude') stlo = _validate_coord(coords.longitude, 'station longitude') stel = _validate_coord(coords.elevation, 'station elevation') hypo = trace.stats.event.hypocenter evla = _validate_coord(hypo.latitude.value_in_deg, 'event latitude') evlo = _validate_coord(hypo.longitude.value_in_deg, 'event longitude') evdp = _validate_coord(hypo.depth.value_in_km, 'event depth') epi_dist, az, baz = gps2dist_azimuth(evla, evlo, stla, stlo) epi_dist /= 1e3 # in km gcarc = kilometers2degrees(epi_dist) hypo_dist = math.sqrt(epi_dist**2 + (stel + evdp)**2) trace.stats.azimuth = az trace.stats.back_azimuth = baz trace.stats.epi_dist = epi_dist trace.stats.hypo_dist = hypo_dist trace.stats.gcarc = gcarc
# -----------------------------------------------------------------------------