# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Build spectral objects.
:copyright:
2012 Claudio Satriano <satriano@ipgp.fr>
2013-2014 Claudio Satriano <satriano@ipgp.fr>,
Emanuela Matrullo <matrullo@geologie.ens.fr>,
Agnes Chounet <chounet@ipgp.fr>
2015-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import logging
import math
import numpy as np
from scipy.integrate import cumtrapz
from scipy.interpolate import interp1d
from obspy.core import Stream
from sourcespec.spectrum import Spectrum, SpectrumStream
from sourcespec.ssp_setup import ssp_exit
from sourcespec.ssp_util import (
smooth, cosine_taper, moment_to_mag, MediumProperties,
geom_spread_r_power_n, geom_spread_boatwright, geom_spread_teleseismic)
from sourcespec.ssp_process_traces import filter_trace
from sourcespec.ssp_correction import station_correction
from sourcespec.ssp_radiation_pattern import get_radiation_pattern_coefficient
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
[docs]
class SpectrumIgnored(Exception):
"""Spectrum ignored exception"""
def __init__(self, message, reason):
# Call the base class constructor with the parameters it needs
super().__init__(message)
# Now for your custom code...
self.reason = reason
def _get_nint(config, trace):
if config.trace_units == 'auto':
instrtype = trace.stats.instrtype
else:
instrtype = config.trace_units
if instrtype == 'acc':
nint = 2
elif instrtype in ['broadb', 'shortp', 'vel']:
nint = 1
elif instrtype == 'disp':
nint = 0
else:
raise ValueError
return nint
def _time_integrate(config, trace):
nint = _get_nint(config, trace)
trace.detrend(type='constant')
trace.detrend(type='linear')
for _ in range(nint):
trace.data = cumtrapz(trace.data) * trace.stats.delta
trace.stats.npts -= 1
filter_trace(config, trace)
def _frequency_integrate(config, spec):
nint = _get_nint(config, spec)
for _ in range(nint):
spec.data /= (2 * math.pi * spec.freq)
def _cut_spectrum(config, spec):
# see if there is a station-specific frequency range
station = spec.stats.station
try:
freq1 = float(config[f'freq1_{station}'])
freq2 = float(config[f'freq2_{station}'])
except KeyError:
try:
instrtype = spec.stats.instrtype
freq1 = float(config[f'freq1_{instrtype}'])
freq2 = float(config[f'freq2_{instrtype}'])
except KeyError as e:
raise RuntimeError(
f'{spec.id}: Unknown instrument type: {instrtype}: '
'skipping spectrum'
) from e
return spec.slice(freq1, freq2)
def _compute_h(spec_st, code, vertical_channel_codes=None, wave_type='S'):
"""
Compute the component 'H' from geometric mean of the stream components.
(which can also be all three components)
"""
if vertical_channel_codes is None:
vertical_channel_codes = ['Z']
spec_h = None
for spec in spec_st:
# this avoids taking a component from co-located station:
# ('code' is band+instrument code)
channel = spec.stats.channel
if channel[:-1] != code:
continue
# avoid reusing a previous 'H' channel
if channel[-1] == 'H':
continue
# only use transverse component for SH
if wave_type == 'SH' and channel[-1] != 'T':
continue
# do not use transverse component for SV
if wave_type == 'SV' and channel[-1] == 'T':
continue
if spec_h is None:
spec_h = spec.copy()
spec_h.data = np.power(spec_h.data, 2)
spec_h.data_logspaced = np.power(spec_h.data_logspaced, 2)
spec_h.stats.channel = f'{code}H'
else:
spec_h.data += np.power(spec.data, 2)
spec_h.data_logspaced += np.power(spec.data_logspaced, 2)
if spec_h is not None:
spec_h.data = np.sqrt(spec_h.data)
spec_h.data_logspaced = np.sqrt(spec_h.data_logspaced)
return spec_h
def _check_data_len(config, trace):
traceId = trace.get_id()
trace_cut = trace.copy()
if config.wave_type[0] == 'S':
t1 = trace.stats.arrivals['S1'][1]
t2 = trace.stats.arrivals['S2'][1]
elif config.wave_type[0] == 'P':
t1 = trace.stats.arrivals['P1'][1]
t2 = trace.stats.arrivals['P2'][1]
trace_cut.trim(starttime=t1, endtime=t2, pad=True, fill_value=0)
npts = len(trace_cut.data)
if npts == 0:
raise RuntimeError(
f'{traceId}: No data for the selected cut interval: '
'skipping trace')
nzeros = len(np.where(trace_cut.data == 0)[0])
if nzeros > npts / 4:
raise RuntimeError(
f'{traceId}: Signal window is zero for more than 25%: '
'skipping trace')
def _cut_signal_noise(config, trace):
trace_signal = trace.copy()
trace_noise = trace.copy()
# Integrate in time domain, if required.
# (otherwise frequency-domain integration is performed later)
if config.time_domain_int:
_time_integrate(config, trace_signal)
_time_integrate(config, trace_noise)
# trim...
if config.wave_type[0] == 'S':
t1 = trace.stats.arrivals['S1'][1]
t2 = trace.stats.arrivals['S2'][1]
elif config.wave_type[0] == 'P':
t1 = trace.stats.arrivals['P1'][1]
t2 = trace.stats.arrivals['P2'][1]
trace_signal.trim(starttime=t1, endtime=t2, pad=True, fill_value=0)
trace_signal.stats.type = 'signal'
# Noise time window for weighting function:
noise_t1 = trace.stats.arrivals['N1'][1]
noise_t2 = trace.stats.arrivals['N2'][1]
trace_noise.trim(
starttime=noise_t1, endtime=noise_t2, pad=True, fill_value=0)
trace_noise.stats.type = 'noise'
# ...taper...
cosine_taper(trace_signal.data, width=config.taper_halfwidth)
cosine_taper(trace_noise.data, width=config.taper_halfwidth)
# Be sure that both traces have same length:
npts = min(len(trace_signal), len(trace_noise))
if len(trace_signal) <= len(trace_noise):
# truncate noise to signal length
trace_noise.data = trace_noise.data[:npts]
else:
tr_noise_id = trace_noise.get_id()[:-1]
if config.weighting == 'noise':
msg = f'{tr_noise_id}: truncating signal window to noise length!'
trace_signal.data = trace_signal.data[:npts]
# recompute signal window end time, so that it's plotted correctly
_recompute_time_window(
trace, config.wave_type[0], npts, keep='start')
else:
msg = f'{tr_noise_id}: zero-padding noise window to signal length'
# Notes:
# 1. no risk of ringing, as noise has been tapered
# 2. we use np.pad instead of obspy trim method
# to avoid potential rounding errors with start/end times
# 3. we just pad at the end for simplicity (no changes
# in trace headers required) and it is identical to
# symmetric padding (tested)
pad_len = len(trace_signal) - len(trace_noise)
trace_noise.data = np.pad(trace_noise.data, (0, pad_len))
# recompute noise window start time, so that it's plotted correctly
_recompute_time_window(trace, 'N', len(trace_signal), keep='end')
logger.warning(msg)
return trace_signal, trace_noise
def _recompute_time_window(trace, wave_type, npts, keep='start'):
"""Recompute start or end time of signal or noise window,
based on new number of points"""
length = npts * trace.stats.delta
if keep == 'end':
label, _ = trace.stats.arrivals[f'{wave_type}1']
t1 = trace.stats.arrivals[f'{wave_type}2'][1] - length
trace.stats.arrivals[f'{wave_type}1'] = (label, t1)
elif keep == 'start':
label, _ = trace.stats.arrivals[f'{wave_type}2']
t2 = trace.stats.arrivals[f'{wave_type}1'][1] + length
trace.stats.arrivals[f'{wave_type}2'] = (label, t2)
else:
raise ValueError('keep must be "start" or "end"')
def _check_noise_level(trace_signal, trace_noise, config):
traceId = trace_signal.get_id()
trace_signal_rms = ((trace_signal.data**2).sum())**0.5
# Scale trace_noise_rms to length of signal window,
# based on length of non-zero noise window
try:
scale_factor = float(len(trace_signal)) / len(trace_noise.data != 0)
except ZeroDivisionError:
scale_factor = 1
trace_noise_rms = ((trace_noise.data**2 * scale_factor).sum())**0.5
if (
trace_noise_rms / trace_signal_rms < 1e-6 and
config.weighting == 'noise'
):
# Skip trace if noise level is too low and if noise weighting is used
raise RuntimeError(
f'{traceId}: Noise level is too low or zero: '
'station will be skipped')
def _geometrical_spreading_coefficient(config, spec):
"""
Return the geometrical spreading coefficient for the given spectrum.
"""
hypo_dist_in_km = spec.stats.hypo_dist
epi_dist_in_km = spec.stats.epi_dist
# set geometrical spreading distance to a very large value if it is None
geom_spread_min_dist = config.geom_spread_min_teleseismic_distance or 1e99
geom_spread_model =\
'teleseismic' if epi_dist_in_km >= geom_spread_min_dist\
else config.geom_spread_model
logger.info(f'{spec.id}: geometrical spreading model: {geom_spread_model}')
if geom_spread_model == 'teleseismic':
angular_distance = spec.stats.gcarc
source_depth_in_km = spec.stats.event.hypocenter.depth.value_in_km
station_depth_in_km = -spec.stats.coords.elevation
phase = config.wave_type[0]
return geom_spread_teleseismic(
angular_distance, source_depth_in_km, station_depth_in_km, phase)
if config.geom_spread_model == 'r_power_n':
exponent = config.geom_spread_n_exponent
return geom_spread_r_power_n(hypo_dist_in_km, exponent)
if config.geom_spread_model == 'boatwright':
cutoff_dist_in_km = config.geom_spread_cutoff_distance
return geom_spread_boatwright(
hypo_dist_in_km, cutoff_dist_in_km, spec.freq)
raise ValueError(
f'Unknown geometrical spreading model: {config.geom_spread_model}')
# store log messages to avoid duplicates
PROPERTY_LOG_MESSAGES = []
def _displacement_to_moment(stats, config):
"""
Return the coefficient for converting displacement to seismic moment.
From Aki&Richards,1980
"""
phase = config.wave_type[0]
lon = stats.coords.longitude
lat = stats.coords.latitude
depth = -stats.coords.elevation
medium_properties = MediumProperties(lon, lat, depth, config)
depth_string = medium_properties.to_string('station depth', depth)
v_name = f'v{phase.lower()}'
v_source = config.event.hypocenter[v_name]
v_source_string = medium_properties.to_string(f'{v_name}_source', v_source)
v_station = medium_properties.get(mproperty=v_name, where='stations')
stats.v_station = v_station
stats.v_station_type = phase
v_station_string = medium_properties.to_string(
f'{v_name}_station', v_station)
rho_source = config.event.hypocenter.rho
rho_source_string = medium_properties.to_string('rho_source', rho_source)
rho_station = medium_properties.get(mproperty='rho', where='stations')
stats.rho_station = rho_station
rho_station_string = medium_properties.to_string(
'rho_station', rho_station)
specid = '.'.join((
stats.network, stats.station, stats.location, stats.channel))
msg = (
f'{specid}: {depth_string}, '
f'{v_source_string}, {v_station_string}'
)
if msg not in PROPERTY_LOG_MESSAGES:
logger.info(msg)
PROPERTY_LOG_MESSAGES.append(msg)
msg = (
f'{specid}: {depth_string}, '
f'{rho_source_string}, {rho_station_string}'
)
if msg not in PROPERTY_LOG_MESSAGES:
logger.info(msg)
PROPERTY_LOG_MESSAGES.append(msg)
v_source *= 1000.
v_station *= 1000.
v3 = v_source**(5. / 2) * v_station**(1. / 2)
rho = rho_source**0.5 * rho_station**0.5
fsa = config.free_surface_amplification
return 4 * math.pi * v3 * rho / (fsa * stats.radiation_pattern)
def _smooth_spectrum(spec, smooth_width_decades=0.2):
"""Smooth spectrum in a log10-freq space."""
# 1. Generate log10-spaced frequencies
freq = spec.freq
_log_freq = np.log10(freq)
# frequencies in logarithmic spacing
log_df = _log_freq[-1] - _log_freq[-2]
freq_logspaced =\
10**(np.arange(_log_freq[0], _log_freq[-1] + log_df, log_df))
# 2. Reinterpolate data using log10 frequencies
# make sure that extrapolation does not create negative values
f = interp1d(freq, spec.data, fill_value='extrapolate')
data_logspaced = f(freq_logspaced)
data_logspaced[data_logspaced <= 0] = np.min(spec.data)
# 3. Smooth log10-spaced data points
npts = max(1, int(round(smooth_width_decades / log_df)))
data_logspaced = smooth(data_logspaced, window_len=npts)
# 4. Reinterpolate to linear frequencies
# make sure that extrapolation does not create negative values
f = interp1d(freq_logspaced, data_logspaced, fill_value='extrapolate')
data = f(freq)
data[data <= 0] = np.min(spec.data)
spec.data = data
# 5. Optimize the sampling rate of log spectrum,
# based on the width of the smoothing window
# make sure that extrapolation does not create negative values
log_df = smooth_width_decades / 5
freq_logspaced =\
10**(np.arange(_log_freq[0], _log_freq[-1] + log_df, log_df))
spec.freq_logspaced = freq_logspaced
data_logspaced = f(freq_logspaced)
data_logspaced[data_logspaced <= 0] = np.min(spec.data)
spec.data_logspaced = data_logspaced
def _build_spectrum(config, trace):
spec = Spectrum(obspy_trace=trace)
spec.stats.instrtype = trace.stats.instrtype
spec.stats.coords = trace.stats.coords
spec.stats.event = trace.stats.event
spec.stats.hypo_dist = trace.stats.hypo_dist
spec.stats.epi_dist = trace.stats.epi_dist
spec.stats.gcarc = trace.stats.gcarc
spec.stats.azimuth = trace.stats.azimuth
spec.stats.travel_times = trace.stats.travel_times
spec.stats.takeoff_angles = trace.stats.takeoff_angles
spec.stats.ignore = trace.stats.ignore
# Integrate in frequency domain, if no time-domain
# integration has been performed
if not config.time_domain_int:
_frequency_integrate(config, spec)
# cut the spectrum
spec = _cut_spectrum(config, spec)
# correct geometrical spreading
try:
geom_spread = _geometrical_spreading_coefficient(config, spec)
except Exception as e:
raise RuntimeError(
f'{spec.id}: Error computing geometrical spreading: '
f'skipping spectrum\n{str(e)}'
) from e
spec.data *= geom_spread
# store the radiation pattern coefficient in the spectrum stats
spec.stats.radiation_pattern =\
get_radiation_pattern_coefficient(spec.stats, config)
# convert to seismic moment
coeff = _displacement_to_moment(spec.stats, config)
spec.data *= coeff
# store coeff to correct back data in displacement units
# for radiated_energy()
spec.stats.coeff = coeff
# smooth
_smooth_spectrum(spec, config.spectral_smooth_width_decades)
return spec
def _build_uniform_weight(spec):
weight = spec.copy()
weight.snratio = None
weight.data = np.ones_like(weight.data)
weight.data_logspaced = np.ones_like(weight.data_logspaced)
return weight
def _build_weight_from_frequency(config, spec):
weight = spec.copy()
freq = weight.freq
weight.data = np.ones_like(weight.data)
weight.data[freq <= config.f_weight] = config.weight
weight.data /= np.max(weight.data)
freq_logspaced = weight.freq_logspaced
weight.data_logspaced = np.ones_like(weight.data_logspaced)
weight.data_logspaced[freq_logspaced <= config.f_weight] = config.weight
weight.data_logspaced /= np.max(weight.data_logspaced)
return weight
def _build_weight_from_inv_frequency(spec, power=0.25):
"""
Build spectral weights from inverse frequency (raised to a power < 1)
"""
if power >= 1:
raise ValueError('pow must be < 1')
# Note: weight.data is used for plotting,
# weight.data_logspaced for actual weighting
weight = spec.copy()
freq = weight.freq
weight.data *= 0
# Limit non-zero weights to fmin/fmax from spectral_snratio if available
snr_fmin = getattr(spec.stats, 'spectral_snratio_fmin', None)
snr_fmax = getattr(spec.stats, 'spectral_snratio_fmax', None)
i0 = np.where(freq >= snr_fmin)[0][0] if snr_fmin else 0
i1 = np.where(freq <= snr_fmax)[0][-1] if snr_fmax else len(freq) - 1
# Build weights as if frequencies always start from 0.25 Hz
# to obtain similar curves regardless of fmin
# and to avoid too much weight for very low frequencies
weight.data[i0: i1 + 1] = 1. / (freq[i0: i1 + 1] - freq[i0] + 0.25)**power
weight.data /= np.max(weight.data)
freq_logspaced = weight.freq_logspaced
weight.data_logspaced *= 0
i0 = np.where(freq_logspaced >= snr_fmin)[0][0] if snr_fmin else 0
i1 = np.where(freq_logspaced <= snr_fmax)[0][-1] if snr_fmax\
else len(freq_logspaced) - 1
weight.data_logspaced[i0: i1 + 1] =\
1. / (freq_logspaced[i0: i1 + 1] - freq_logspaced[i0] + 0.25)**power
weight.data_logspaced /= np.max(weight.data_logspaced)
return weight
def _build_weight_from_ratio(spec, specnoise, smooth_width_decades):
weight = spec.copy()
weight.data /= specnoise.data
# save signal-to-noise ratio before log10, smoothing, and normalization
weight.snratio = weight.data.copy()
# The inversion is done in magnitude units,
# so let's take log10 of weight
weight.data = np.log10(weight.data)
# Weight spectrum is smoothed once more
_smooth_spectrum(weight, smooth_width_decades)
weight.data /= np.max(weight.data)
# slightly taper weight at low frequencies, to avoid overestimating
# weight at low frequencies, in cases where noise is underestimated
cosine_taper(
weight.data,
min(0.25, weight.stats.delta / 4),
left_taper=True)
# Zero out weight below 0.2 Hz, so that it does not affect the fit
weight.data[weight.data <= 0.2] = 1e-9
return weight
def _build_weight_from_noise(config, spec, specnoise):
if specnoise is None or np.all(specnoise.data == 0):
spec_id = spec.get_id()[:-1]
logger.warning(
f'{spec_id}: No available noise window: '
'a uniform weight will be applied')
weight = _build_uniform_weight(spec)
else:
weight = _build_weight_from_ratio(
spec, specnoise, config.spectral_smooth_width_decades)
# interpolate to log-frequencies
f = interp1d(weight.freq, weight.data, fill_value='extrapolate')
weight.data_logspaced = f(weight.freq_logspaced)
weight.data_logspaced /= np.max(weight.data_logspaced)
# Make sure weight is positive
weight.data_logspaced[weight.data_logspaced <= 0] = 0.001
return weight
def _build_weight_spectral_stream(config, spec_st, specnoise_st):
"""Build a stream of weights from a stream of spectra and a stream of
noise spectra."""
weight_st = SpectrumStream()
spec_ids = {sp.id[:-1] for sp in spec_st if not sp.stats.ignore}
for specid in spec_ids:
try:
spec_h = _select_spectra(spec_st, f'{specid}H')[0]
specnoise_h = _select_spectra(specnoise_st, f'{specid}H')[0]
except Exception:
continue
if config.weighting == 'noise':
weight = _build_weight_from_noise(config, spec_h, specnoise_h)
elif config.weighting == 'frequency':
weight = _build_weight_from_frequency(config, spec_h)
elif config.weighting == 'inv_frequency':
weight = _build_weight_from_inv_frequency(spec_h)
elif config.weighting == 'no_weight':
weight = _build_uniform_weight(spec_h)
weight_st.append(weight)
return weight_st
def _select_spectra(spec_st, specid):
"""Select spectra from stream, based on specid."""
network, station, location, channel = specid.split('.')
channel = channel + '?' * (3 - len(channel))
spec_st_sel = spec_st.select(
network=network, station=station, location=location, channel=channel)
spec_st_sel = SpectrumStream(
sp for sp in spec_st_sel if not sp.stats.ignore)
return spec_st_sel
def _build_H(spec_st, specnoise_st=None, vertical_channel_codes=None,
wave_type='S'):
"""
Add to spec_st and specnoise_st the "H" component.
H component is obtained from the modulus of all the available components.
"""
if vertical_channel_codes is None:
vertical_channel_codes = ['Z']
spec_ids = {sp.id[:-1] for sp in spec_st if not sp.stats.ignore}
for specid in spec_ids:
spec_st_sel = _select_spectra(spec_st, specid)
specnoise_st_sel = _select_spectra(specnoise_st, specid)
# 'code' is band+instrument code
for code in {x.stats.channel[:-1] for x in spec_st_sel}:
spec_h = _compute_h(
spec_st_sel, code, vertical_channel_codes, wave_type)
if spec_h is not None:
spec_st.append(spec_h)
specnoise_h = _compute_h(
specnoise_st_sel, code, vertical_channel_codes, wave_type)
if specnoise_h is not None:
specnoise_st.append(specnoise_h)
def _check_spectral_sn_ratio(config, spec, specnoise):
spec_id = spec.get_id()
weight = _build_weight_from_noise(config, spec, specnoise)
freqs = weight.freq
# if no noise window is available, snratio is not computed
if weight.snratio is None:
spec.stats.spectral_snratio = None
return
if config.spectral_sn_freq_range is not None:
sn_fmin, sn_fmax = config.spectral_sn_freq_range[:2]
valid_freqs_idx = np.where((sn_fmin <= freqs) * (freqs <= sn_fmax))
valid_snratio = weight.snratio[valid_freqs_idx]
else:
valid_snratio = weight.snratio
if len(valid_snratio) == 0:
spec.stats.spectral_snratio = np.nan
msg = (
f'{spec_id}: no valid frequency to compute spectral S/N: '
'ignoring spectrum'
)
reason = 'no valid frequency'
raise SpectrumIgnored(msg, reason)
spectral_snratio = valid_snratio.mean()
spec.stats.spectral_snratio = spectral_snratio
# Save frequency range where SNR > 3 so it can be used for building weights
# Note: not sure if we could use config.spectral_sn_min here instead of 3
snr_valid_freqs = freqs[weight.snratio >= 3]
try:
spec.stats.spectral_snratio_fmin = snr_valid_freqs[0]
spec.stats.spectral_snratio_fmax = snr_valid_freqs[-1]
except IndexError:
spec.stats.spectral_snratio_fmin = None
spec.stats.spectral_snratio_fmax = None
logger.info(f'{spec_id}: average spectral S/N: {spectral_snratio:.2f}')
ssnmin = config.spectral_sn_min or -np.inf
if spectral_snratio < ssnmin:
msg = (
f'{spec_id}: spectral S/N smaller than {ssnmin:.2f}: '
'ignoring spectrum')
reason = 'low spectral S/N'
raise SpectrumIgnored(msg, reason)
def _ignore_spectrum(msg, spec, specnoise):
"""Ignore spectrum. Set ignore flag and reason."""
logger.warning(msg)
spec.stats.ignore = True
spec.stats.ignore_reason = msg.reason
specnoise.stats.ignore = True
specnoise.stats.ignore_reason = msg.reason
def _ignore_trace(msg, trace):
"""Ignore trace. Set ignore flag and reason."""
# NOTE: no logger.warning here, because it is already done in
# _ignore_spectrum()
trace.stats.ignore = True
trace.stats.ignore_reason = msg.reason
def _build_signal_and_noise_streams(config, st):
"""Build signal and noise streams."""
# remove traces with ignore flag
traces = Stream([tr for tr in st if not tr.stats.ignore])
signal_st = Stream()
noise_st = Stream()
for trace in sorted(traces, key=lambda tr: tr.id):
try:
_check_data_len(config, trace)
trace_signal, trace_noise = _cut_signal_noise(config, trace)
_check_noise_level(trace_signal, trace_noise, config)
signal_st.append(trace_signal)
noise_st.append(trace_noise)
except RuntimeError as msg:
# RuntimeError is for skipped traces
logger.warning(msg)
continue
return signal_st, noise_st
def _trim_components(config, signal_st, noise_st, st):
"""
Trim components of the same instrument to the same number of samples.
Recompute time window of the signal and noise traces for correct plotting.
"""
for traceid in sorted({tr.id[:-1] for tr in signal_st}):
st_sel = signal_st.select(id=f'{traceid}*') +\
noise_st.select(id=f'{traceid}*')
all_npts = {tr.stats.npts for tr in st_sel}
if len(all_npts) == 1:
continue
logger.warning(
f'{traceid}: components have different window lengths. '
'Trimming signal and noise windows to the shortest one')
npts = min(all_npts)
for tr in st_sel:
if tr.stats.type == 'signal':
tr.data = tr.data[:npts]
elif tr.stats.type == 'noise':
tr.data = tr.data[-npts:]
for tr in st.select(id=f'{traceid}*'):
_recompute_time_window(tr, config.wave_type[0], npts, keep='start')
_recompute_time_window(tr, 'N', npts, keep='end')
def _build_signal_and_noise_spectral_streams(
config, signal_st, noise_st, original_st):
"""
Build signal and noise spectral streams.
Note: original_st is only used to keep track of ignored traces.
"""
spec_st = SpectrumStream()
specnoise_st = SpectrumStream()
for trace_signal in sorted(signal_st, key=lambda tr: tr.id):
trace_noise = noise_st.select(id=trace_signal.id)[0]
try:
spec = _build_spectrum(config, trace_signal)
specnoise = _build_spectrum(config, trace_noise)
_check_spectral_sn_ratio(config, spec, specnoise)
except RuntimeError as msg:
# RuntimeError is for skipped spectra
logger.warning(msg)
continue
except SpectrumIgnored as msg:
_ignore_spectrum(msg, spec, specnoise)
trace_original = original_st.select(id=trace_signal.id)[0]
_ignore_trace(msg, trace_original)
spec_st.append(spec)
specnoise_st.append(specnoise)
if not spec_st:
logger.error('No spectra left! Exiting.')
ssp_exit()
# build H component
_build_H(
spec_st, specnoise_st, config.vertical_channel_codes, config.wave_type)
# convert the spectral amplitudes to moment magnitude
for spec in spec_st:
spec.data_mag = moment_to_mag(spec.data)
spec.data_mag_logspaced = moment_to_mag(spec.data_logspaced)
for specnoise in specnoise_st:
specnoise.data_mag = moment_to_mag(specnoise.data)
# apply station correction if a residual file is specified in config
spec_st = station_correction(spec_st, config)
return spec_st, specnoise_st
def _zero_pad(config, trace):
"""Zero-pad trace to spectral_win_length"""
spec_win_len = config.spectral_win_length
if spec_win_len is None:
return
wtype = config.wave_type[0]
if trace.stats.type == 'signal':
t1 = trace.stats.arrivals[f'{wtype}1'][1]
elif trace.stats.type == 'noise':
t1 = trace.stats.arrivals['N1'][1]
trace.trim(starttime=t1, endtime=t1 + spec_win_len, pad=True, fill_value=0)
[docs]
def build_spectra(config, st):
"""
Build spectra and the ``spec_st`` object.
Computes P- or S-wave (displacement) spectra from
accelerometers and velocimeters, uncorrected for anelastic attenuation,
corrected for instrumental constants, normalized by geometrical spreading.
"""
wave_type = config.wave_type
logger.info(f'Building {wave_type}-wave spectra...')
signal_st, noise_st = _build_signal_and_noise_streams(config, st)
_trim_components(config, signal_st, noise_st, st)
for trace in signal_st + noise_st:
_zero_pad(config, trace)
spec_st, specnoise_st = _build_signal_and_noise_spectral_streams(
config, signal_st, noise_st, st)
weight_st = _build_weight_spectral_stream(config, spec_st, specnoise_st)
logger.info(f'Building {wave_type}-wave spectra: done')
logger.info('---------------------------------------------------')
return spec_st, specnoise_st, weight_st