Source code for ssp_process_traces

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Trace processing for sourcespec.

:copyright:
    2012 Claudio Satriano <satriano@ipgp.fr>

    2013-2014 Claudio Satriano <satriano@ipgp.fr>,
              Emanuela Matrullo <matrullo@geologie.ens.fr>

    2015-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import logging
import re
import numpy as np
from scipy.signal import savgol_filter
from obspy.core import Stream
from obspy.core.util import AttribDict
from sourcespec.ssp_setup import ssp_exit
from sourcespec.ssp_util import (
    remove_instr_response, station_to_event_position)
from sourcespec.ssp_wave_arrival import add_arrival_to_trace
from sourcespec.clipping_detection import (
    compute_clipping_score, clipping_peaks)
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])


def _get_bandpass_frequencies(config, trace):
    """Get frequencies for bandpass filter."""
    # see if there is a station-specific filter
    station = trace.stats.station
    try:
        bp_freqmin = float(config[f'bp_freqmin_{station}'])
        bp_freqmax = float(config[f'bp_freqmax_{station}'])
    except KeyError:
        instrtype = trace.stats.instrtype
        try:
            bp_freqmin = float(config[f'bp_freqmin_{instrtype}'])
            bp_freqmax = float(config[f'bp_freqmax_{instrtype}'])
        except KeyError as e:
            raise ValueError(
                f'{trace.id}: Unknown instrument type: {instrtype}: '
                'skipping trace'
            ) from e
    return bp_freqmin, bp_freqmax


[docs] def filter_trace(config, trace): """Filter trace.""" bp_freqmin, bp_freqmax = _get_bandpass_frequencies(config, trace) nyquist = 1. / (2. * trace.stats.delta) if bp_freqmax >= nyquist: bp_freqmax = nyquist * 0.999 logger.warning( f'{trace.id}: maximum frequency for bandpass filtering ' f'is larger or equal to Nyquist. Setting it to {bp_freqmax} Hz') filter_options = { 'type': 'bandpass', 'freqmin': bp_freqmin, 'freqmax': bp_freqmax } trace.filter(**filter_options) # save filter info to trace stats trace.stats.filter = AttribDict(filter_options)
def _check_signal_level(config, trace): rms2 = np.power(trace.data, 2).sum() rms = np.sqrt(rms2) rms_min = config.rmsmin if rms <= rms_min: raise RuntimeError( f'{trace.stats.info}: Trace RMS smaller than {rms_min:g}: ' 'skipping trace') def _check_clipping(config, trace): trace.stats.clipped = False if config.clipping_detection_algorithm == 'none': return # cut the trace between the beginning of noise window # and the end of the signal window t1 = trace.stats.arrivals['N1'][1] if config.wave_type[0] == 'S': t2 = trace.stats.arrivals['S2'][1] elif config.wave_type[0] == 'P': t2 = trace.stats.arrivals['P2'][1] tr = trace.copy().trim(t1, t2) if config.clipping_detection_algorithm == 'clipping_score': score = compute_clipping_score( tr, config.remove_baseline, config.clipping_debug_plot) logger.info(f'{tr.stats.info}: clipping score: {score:.1f}%') if score > config.clipping_score_threshold: trace.stats.clipped = True trace.stats.ignore = True trace.stats.ignore_reason = f'clipping: {score:.1f}%' elif config.clipping_detection_algorithm == 'clipping_peaks': trace_clipped, properties = clipping_peaks( tr, config.clipping_peaks_sensitivity, config.clipping_peaks_percentile, config.clipping_debug_plot) logger.info( f'{tr.stats.info}: total peaks: {properties["npeaks"]}, ' f'clipped peaks: {properties["npeaks_clipped"]}') if trace_clipped: trace.stats.clipped = True trace.stats.ignore = True trace.stats.ignore_reason = 'clipped' if trace.stats.clipped: logger.warning( f'{tr.stats.info}: Trace is clipped or significantly distorted: ' 'skipping trace') def _check_sn_ratio(config, trace): trace_noise = _get_detrended_trace_copy(trace) t1 = trace_noise.stats.arrivals['N1'][1] t2 = trace_noise.stats.arrivals['N2'][1] trace_noise.trim(starttime=t1, endtime=t2, pad=True, fill_value=0) trace_signal = _get_detrended_trace_copy(trace) if config.wave_type[0] == 'S': t1 = trace_signal.stats.arrivals['S1'][1] t2 = trace_signal.stats.arrivals['S2'][1] elif config.wave_type[0] == 'P': t1 = trace_signal.stats.arrivals['P1'][1] t2 = trace_signal.stats.arrivals['P2'][1] trace_signal.trim(starttime=t1, endtime=t2, pad=True, fill_value=0) rmsnoise2 = np.power(trace_noise.data, 2).sum() rmsnoise = np.sqrt(rmsnoise2) rmsS2 = np.power(trace_signal.data, 2).sum() rmsS = np.sqrt(rmsS2) if rmsnoise == 0: if config.weighting == 'noise': raise RuntimeError( f'{trace.stats.info}: empty noise window: skipping trace') logger.warning(f'{trace.stats.info}: empty noise window!') rmsnoise = 1. sn_ratio = rmsS / rmsnoise logger.info(f'{trace.stats.info}: S/N: {sn_ratio:.1f}') trace.stats.sn_ratio = sn_ratio # stop here if trace is already ignored if trace.stats.ignore: return snratio_min = config.sn_min if sn_ratio < snratio_min: logger.warning( f'{trace.stats.info}: S/N smaller than {snratio_min:g}: ' 'skipping trace') trace.stats.ignore = True trace.stats.ignore_reason = 'low S/N' def _get_detrended_trace_copy(trace): # noise time window for s/n ratio tr_copy = trace.copy() # remove the mean... tr_copy.detrend(type='constant') # ...and the linear trend... tr_copy.detrend(type='linear') return tr_copy def _remove_baseline(config, trace): """ Get the signal baseline using a Savitzky-Golay filter and subtract it from the trace. """ if not config.remove_baseline: return npts = len(trace.data) wlen = npts // 10 if wlen % 2 == 0: wlen += 1 baseline = savgol_filter(trace.data, wlen, 3) trace.data -= baseline def _process_trace(config, trace): # copy trace for manipulation trace_process = trace.copy() comp = trace_process.stats.channel if config.ignore_vertical and comp[-1] in config.vertical_channel_codes: if config.wave_type == 'P': logger.warning( f'{trace.stats.info}: cannot ignore vertical trace, ' 'since "wave_type" is set to "P"') else: logger.info( f'{trace.stats.info}: ignoring vertical trace as requested') trace_process.stats.ignore = True trace_process.stats.ignore_reason = 'vertical' # check if the trace has (significant) signal _check_signal_level(config, trace_process) # check if trace is clipped _check_clipping(config, trace_process) # Remove instrument response bp_freqmin, bp_freqmax = _get_bandpass_frequencies(config, trace) if config.correct_instrumental_response: try: pre_filt = ( bp_freqmin, bp_freqmin * 1.1, bp_freqmax * 0.9, bp_freqmax) remove_instr_response(trace_process, pre_filt) except Exception as e: raise RuntimeError( f'{e}\n' f'{trace.stats.info}: Unable to remove instrument response: ' 'skipping trace') from e _remove_baseline(config, trace_process) filter_trace(config, trace_process) # Check if the trace has significant signal to noise ratio _check_sn_ratio(config, trace_process) return trace_process def _add_station_to_event_position(trace): """ Add to ``trace.stats`` station-to-event distance (hypocentral and epicentral), great-circle distance, azimuth and backazimuth. Raise RuntimeError if unable to compute distances. """ try: station_to_event_position(trace) except Exception as e: raise RuntimeError( f'{trace.id}: Unable to compute hypocentral distance: {e}. ' 'Skipping trace' ) from e def _check_epicentral_distance(config, trace): """ Reject traces with hypocentral distance outside the range specified in the configuration file. """ if config.epi_dist_ranges is None: return # transform integers to true integers, for better string representation edr = [ int(r) if float(r).is_integer() else r for r in config.epi_dist_ranges] # if edr has an odd number of elements, add one last element if len(edr) % 2 == 1: edr.append(999999) # reshape edr to a list of pairs edr = [[edr[i], edr[i+1]] for i in range(0, len(edr), 2)] # string representation of edr edr_str = ', '.join(f'{ed[0]}-{ed[1]} km' for ed in edr) epi_dist = trace.stats.epi_dist # check if epi_dist is in one of the ranges if not any(ed[0] <= epi_dist <= ed[1] for ed in edr): raise RuntimeError( f'{trace.id}: Epicentral distance ({epi_dist:.1f} km) ' f'not in the selected range ({edr_str}): skipping trace') def _add_arrivals(config, trace): """Add to trace P and S arrival times, travel times and angles.""" for phase in 'P', 'S': try: add_arrival_to_trace(trace, phase, config) except Exception as e: for line in str(e).splitlines(): logger.warning(line) raise RuntimeError( f'{trace.id}: Unable to get {phase} arrival time: ' 'skipping trace' ) from e def _define_signal_and_noise_windows(config, trace): """Define signal and noise windows for spectral analysis.""" p_arrival_time = trace.stats.arrivals['P'][1] if config.wave_type[0] == 'P' and p_arrival_time < trace.stats.starttime: raise RuntimeError(f'{trace.id}: P-window incomplete: skipping trace') s_arrival_time = trace.stats.arrivals['S'][1] if config.wave_type[0] == 'S' and s_arrival_time < trace.stats.starttime: raise RuntimeError(f'{trace.id}: S-window incomplete: skipping trace') # Signal window for spectral analysis (S phase) s_minus_p = s_arrival_time - p_arrival_time s_pre_time = config.signal_pre_time if s_minus_p / 2 < s_pre_time: # use (Ts-Tp)/2 if it is smaller than signal_pre_time # (for short-distance records with short S-P interval) s_pre_time = s_minus_p / 2 logger.warning( f'{trace.id}: signal_pre_time is larger than (Ts-Tp)/2. ' 'Using (Ts-Tp)/2 instead') t1 = s_arrival_time - s_pre_time t1 = max(trace.stats.starttime, t1) tt_s = trace.stats.travel_times['S'] # manage case where variable_win_length_factor is None: variable_win_length_factor = config.variable_win_length_factor or 0 win_length_s = max(config.win_length, variable_win_length_factor * tt_s) t2 = t1 + win_length_s t2 = min(trace.stats.endtime, t2) win_length_s = t2 - t1 trace.stats.arrivals['S1'] = ('S1', t1) trace.stats.arrivals['S2'] = ('S2', t2) # Signal window for spectral analysis (P phase) t1 = p_arrival_time - config.signal_pre_time t1 = max(trace.stats.starttime, t1) tt_p = trace.stats.travel_times['P'] win_length_p = max(config.win_length, variable_win_length_factor * tt_p) t2 = t1 + min(win_length_p, s_minus_p + s_pre_time) t2 = min(trace.stats.endtime, t2) win_length_p = t2 - t1 trace.stats.arrivals['P1'] = ('P1', t1) trace.stats.arrivals['P2'] = ('P2', t2) # Noise window for spectral analysis if config.wave_type[0] == 'P': win_length = win_length_p elif config.wave_type[0] == 'S': win_length = win_length_s if config.variable_win_length_factor: logger.info(f'{trace.id}: window length {win_length:.3f} seconds') if config.noise_pre_time is None: noise_pre_time = win_length + config.signal_pre_time else: noise_pre_time = config.noise_pre_time t1 = max( trace.stats.starttime, p_arrival_time - noise_pre_time ) t2 = t1 + win_length if t2 > (p_arrival_time - config.signal_pre_time): logger.warning(f'{trace.id}: noise window ends after P-window start') t2 = p_arrival_time - config.signal_pre_time t1 = min(t1, t2) trace.stats.arrivals['N1'] = ('N1', t1) trace.stats.arrivals['N2'] = ('N2', t2) def _check_signal_window(config, st): """ Check if the signal window has sufficient amount of signal (i.e., not too many gaps). This is done on the stream, before merging. """ traceid = st[0].id st_cut = st.copy() if config.wave_type[0] == 'S': t1 = st[0].stats.arrivals['S1'][1] t2 = st[0].stats.arrivals['S2'][1] elif config.wave_type[0] == 'P': t1 = st[0].stats.arrivals['P1'][1] t2 = st[0].stats.arrivals['P2'][1] st_cut.trim(starttime=t1, endtime=t2) if not st_cut: raise RuntimeError( f'{traceid}: no signal for the selected ' f'{config.wave_type[0]}-wave cut interval: skipping trace >\n' f'> Cut interval: {t1} - {t2}') gaps_olaps = st_cut.get_gaps() gaps = [g for g in gaps_olaps if g[6] >= 0] overlaps = [g for g in gaps_olaps if g[6] < 0] duration = st_cut[-1].stats.endtime - st_cut[0].stats.starttime gap_duration = sum(g[6] for g in gaps) if gap_duration > duration / 4: raise RuntimeError( f'{traceid}: too many gaps for the selected cut interval: ' 'skipping trace') overlap_duration = -1 * sum(g[6] for g in overlaps) if overlap_duration > 0: logger.info( f'{traceid}: signal window has {overlap_duration:.3f} seconds ' 'of overlaps.') def _merge_stream(config, st): """ Check for gaps and overlaps; remove mean; merge stream. """ traceid = st[0].id # First, compute gap/overlap statistics for the whole trace. gaps_olaps = st.get_gaps() gaps = [g for g in gaps_olaps if g[6] >= 0] overlaps = [g for g in gaps_olaps if g[6] < 0] gap_duration = sum(g[6] for g in gaps) if gap_duration > 0: logger.info( f'{traceid}: trace has {gap_duration:.3f} seconds of gaps.') gap_max = config.gap_max if gap_max is not None and gap_duration > gap_max: raise RuntimeError( f'{traceid}: Gap duration larger than gap_max ' f'({gap_max:.1f} s): skipping trace') overlap_duration = -1 * sum(g[6] for g in overlaps) if overlap_duration > 0: logger.info( f'{traceid}: trace has {overlap_duration:.3f} seconds of ' 'overlaps.') overlap_max = config.overlap_max if overlap_max is not None and overlap_duration > overlap_max: raise RuntimeError( f'{traceid}: overlap duration larger than overlap_max ' f'({overlap_max:.1f} s): skipping trace') # Finally, demean (only if trace has not be already preprocessed) if config.trace_units == 'auto': # Since the count value is generally huge, we need to demean twice # to take into account for the rounding error st.detrend(type='constant') st.detrend(type='constant') # Merge stream to remove gaps and overlaps try: st.merge(fill_value=0) # st.merge raises a generic Exception if traces have # different sampling rates except Exception as e: raise RuntimeError( f'{traceid}: unable to fill gaps: skipping trace' ) from e return st[0] def _skip_ignored(config, traceid): """Skip traces ignored from config.""" network, station, location, channel = traceid.split('.') # build a list of all possible ids, from station only # to full net.sta.loc.chan ss = [ station, '.'.join((network, station)), '.'.join((network, station, location)), '.'.join((network, station, location, channel)), ] if config.use_traceids is not None: combined = ( "(" + ")|(".join(config.use_traceids) + ")" ).replace('.', r'\.') if not any(re.match(combined, s) for s in ss): raise RuntimeError(f'{traceid}: ignored from config file') if config.ignore_traceids is not None: combined = ( "(" + ")|(".join(config.ignore_traceids) + ")" ).replace('.', r'\.') if any(re.match(combined, s) for s in ss): raise RuntimeError(f'{traceid}: ignored from config file')
[docs] def process_traces(config, st): """Remove mean, deconvolve and ignore unwanted components.""" logger.info('Processing traces...') out_st = Stream() for traceid in sorted({tr.id for tr in st}): try: _skip_ignored(config, traceid) # We still use a stream, since the trace can have gaps or overlaps st_sel = st.select(id=traceid) for _trace in st_sel: _add_station_to_event_position(_trace) _check_epicentral_distance(config, _trace) _add_arrivals(config, _trace) _define_signal_and_noise_windows(config, _trace) _check_signal_window(config, st_sel) trace = _merge_stream(config, st_sel) trace.stats.ignore = False trace_process = _process_trace(config, trace) out_st.append(trace_process) except (ValueError, RuntimeError) as msg: logger.warning(msg) continue if len(out_st) == 0: logger.error('No traces left! Exiting.') ssp_exit() # Rotate traces, if SH or SV is requested if config.wave_type in ['SH', 'SV']: for traceid in sorted({tr.id[:-1] for tr in out_st}): net, sta, loc, chan = traceid.split('.') st_sel = out_st.select( network=net, station=sta, location=loc, channel=f'{chan}?' ) t0 = max(tr.stats.starttime for tr in st_sel) t1 = min(tr.stats.endtime for tr in st_sel) st_sel.trim(t0, t1) st_sel.rotate('NE->RT') logger.info('Processing traces: done') logger.info('---------------------------------------------------') return out_st