Source code for clipping_detection

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Check trace for clipping using kernel density estimation of the trace=
amplitude values.

Two methods are available:
    1.  :func:`clipping_score()`: compute a trace clipping score based on the
        shape of the kernel density estimation.
    2.  :func:`clipping_peaks()`: check if trace is clipped, based on the
        number of peaks in the kernel density estimation;

:copyright:
    2023-2026 Claudio Satriano <satriano@ipgp.fr>,
              Kris Vanneste <kris.vanneste@oma.be>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import numpy as np
from scipy.stats import gaussian_kde
from scipy.signal import find_peaks


def _get_baseline(signal):
    """Get the signal baseline using a Savitzky-Golay filter."""
    # pylint: disable=import-outside-toplevel
    from scipy.signal import savgol_filter
    # search for baselines with characteristic length of at least 1/3
    # of the signal
    wlen = len(signal) // 3
    if wlen % 2 == 0:
        wlen += 1
    return savgol_filter(signal, wlen, 3)


def _preprocess(trace, remove_linear_trend=False, remove_baseline=False):
    """Preprocess a trace for clipping detection."""
    trace = trace.copy()
    if remove_linear_trend:
        trace.detrend('linear')
    trace.detrend('demean')
    max_abs_data = np.max(np.abs(trace.data)) * 1.10  # 10% margin
    # Remove trace baseline if requested
    trace_baseline = None
    if remove_baseline:
        trace_baseline = _get_baseline(trace.data)
        trace.data -= trace_baseline
    return trace, trace_baseline, max_abs_data


def _get_kernel_density(trace, min_data, max_data, num_kde_bins=101,
                        bw_method='scott'):
    """Compute the kernel density of a trace."""
    kde = gaussian_kde(trace.data, bw_method=bw_method)
    density_points = np.linspace(min_data, max_data, num_kde_bins)
    density = kde.pdf(density_points)
    density /= np.max(density)
    return density, density_points


def _get_histogram(trace):
    """Compute the histogram of a trace."""
    # Compute data histogram with a number of bins equal to 0.5% of data points
    # or 31, whichever is larger
    nbins = max(31, int(len(trace.data) * 0.005))
    if nbins % 2 == 0:
        nbins += 1
    counts, bins = np.histogram(trace.data, bins=nbins)
    counts = counts / np.max(counts)
    bin_width = bins[1] - bins[0]
    return counts, bins, bin_width


def _get_distance_weight(density_points, order=2, min_weight=1, max_weight=5):
    """Compute the distance weight for the kernel density."""
    # make sure to use float64 to avoid overflow for high order
    dist_weight = np.abs(density_points.astype(np.float64))**order
    dist_weight *= (max_weight - min_weight) / dist_weight.max()
    dist_weight += min_weight
    return dist_weight


[docs] def clipping_peaks(trace, sensitivity=3, clipping_percentile=10, debug=False): """ Check if a trace is clipped, based on the number of peaks in the kernel density estimation of the trace amplitude values. The algorithm is based on the following steps: 1. The trace is demeaned. 2. A kernel density estimation is computed on the trace amplitude values. 3. The kernel density estimation is weighted by the distance from the zero mean amplitude value, using a parabolic function, between 1 and 5. 4. Peaks are detected in the weighted kernel density estimation. The sensitivity of the peak detection algorithm is controlled by the ``sensitivity`` parameter, based on which a minimum prominence threshold is set. 5. The trace is considered clipped if there is at least one peak in the amplitude range corresponding to the ``clipping_percentile`` parameter. Parameters ---------- trace : obspy.core.trace.Trace Trace to check. sensitivity : int Sensitivity level, from 1 (least sensitive) to 5 (most sensitive). (default: 3). The sensitivity level controls the minimum prominence threshold used for peak detection in the kernel density estimation. See the :func:`scipy.signal.find_peaks()` documentation for more details. clipping_percentile : float, between 0 and 100 Percentile of trace amplitude range (expressed as percentage) to check for clipping. Default is 10, which means that the 10% highest and lowest values of the trace amplitude will be checked for clipping. A value of 0 means that no clipping check will be performed. debug : bool If True, plot trace, samples histogram and kernel density. Returns ------- trace_clipped: bool True if trace is clipped, False otherwise. properties : dict Dictionary with the properties used for the clipping detection. The dictionary contains the following keys: * npeaks (:class:`int`): number of peaks found in the kernel density * npeaks_clipped (:class:`int`): number of peaks found in the kernel density that are considered clipped * peaks (:class:`numpy.ndarray`): list of peaks found in the kernel density * prominences (:class:`numpy.ndarray`): list of prominences of the peaks found in the kernel density """ sensitivity = int(sensitivity) if sensitivity < 1 or sensitivity > 5: raise ValueError('sensitivity must be between 1 and 5') if clipping_percentile < 0 or clipping_percentile > 100: raise ValueError('clipping_percentile must be between 0 and 100') if clipping_percentile == 0: return False lower_clip_bound = 100 - clipping_percentile num_kde_bins = 101 num_edge_bins = int(np.ceil( (num_kde_bins / 2.) * (100 - lower_clip_bound) / 100.)) trace, _, _ = _preprocess( trace, remove_linear_trend=False, remove_baseline=False) min_data = np.min(trace.data) max_data = np.max(trace.data) density, density_points = _get_kernel_density( trace, min_data, max_data, num_kde_bins, bw_method=0.1) # Distance weight, parabolic, between 1 and 5 dist_weight = _get_distance_weight( density_points, order=2, min_weight=1, max_weight=5) density_weight = density * dist_weight # Add 1 bin at start/end with value equal to min. of 5 first/last # bins to ensure local maxima at start/end are recognized as peaks density_weight = np.hstack([[density_weight[:num_edge_bins].min()], density_weight, [density_weight[-num_edge_bins:].min()]]) # find peaks with minimum prominence based on clipping sensitivity min_prominence = [0.5, 0.3, 0.2, 0.15, 0.125] peaks, props = find_peaks( density_weight, prominence=min_prominence[sensitivity - 1] ) # Remove start/end bins again peaks = peaks[(peaks > 0) & (peaks < (num_kde_bins + 1))] peaks -= 1 density_weight = density_weight[1:-1] npeaks = len(peaks) # Clipped peaks are peaks in the edge bins npeaks_clipped = np.sum( peak < num_edge_bins or peak > (num_kde_bins - 1 - num_edge_bins) for peak in peaks ) properties = { 'npeaks': npeaks, 'npeaks_clipped': npeaks_clipped, 'peaks': peaks, 'prominences': props['prominences'], } # If there is at least a peak in the edge bins, # then the signal is probably clipped or distorted trace_clipped = npeaks_clipped > 0 if debug: abs_max_data = max(abs(min_data), abs(max_data)) _plot_clipping_analysis( trace, abs_max_data, density_points, density, density_weight, peaks=peaks, num_edge_bins=num_edge_bins, num_kde_bins=num_kde_bins, trace_clipped=trace_clipped) return trace_clipped, properties
[docs] def compute_clipping_score(trace, remove_baseline=False, debug=False): """ Compute a trace clipping score based on the shape of the kernel density estimation of the trace amplitude values. The algorithm is based on the following steps: 1. The trace is detrended and demeaned. Optionally, the trace baseline can be removed. 2. A kernel density estimation is computed on the trace amplitude values. 3. Two weighted kernel density functions are computed: - a full weighted kernel density, where the kernel density is weighted by the distance from the zero mean amplitude value, using a 8th order power function between 1 and 100. - a weighted kernel density without the central peak, where the kernel density is weighted by the distance from the zero mean amplitude value, using a 8th order power function between 0 and 100. In both cases, the weight gives more importance to samples far from the zero mean value. In the second case, the central peak is ignored. 4. The score, ranging from 0 to 100, is the sum of the squared weighted kernel density without the central peak, normalized by the sum of the squared full weighted kernel density. The score is 0 if there is no additional peak beyond the central peak. Parameters ---------- trace : :class:`~obspy.core.trace.Trace` Trace to check for clipping. remove_baseline : bool, optional If set, remove the trace baseline before computing the score. debug : bool, optional If set, plot the trace, the samples histogram, the kernel density (unweighted and weighted), the kernel baseline model, and the misfit. Default is False. Returns ------- clipping_score : float Clipping score, in percentage. Note ---- Distorted traces (e.g., signals with strong baselines) can also get a high clipping score. To avoid this, the trace baseline can be removed. A debug mode is available to plot the trace, the samples histogram, the kernel density (unweighted and weighted), the kernel baseline model, and the misfit. """ trace, trace_baseline, max_data = _preprocess( trace, remove_linear_trend=True, remove_baseline=remove_baseline) try: density, density_points = _get_kernel_density( trace, -max_data, max_data, 101) except Exception: # if the kernel density estimation fails (e.g., all samples are equal) # return the maximum clipping score return 100 # Distance weight for full weighted kernel density, # 8th order, between 1 and 100 dist_weight_full = _get_distance_weight( density_points, order=8, min_weight=1, max_weight=100) # Compute full weighted kernel density, including the central part density_weight_full = density * dist_weight_full # Distance weight for weighted kernel density without central peak, # 8th order, between 0 and 100 dist_weight_no_central = _get_distance_weight( density_points, order=8, min_weight=0, max_weight=100) # Compute weighted kernel density, excluding the central part density_weight_no_central = density * dist_weight_no_central # Compute the clipping score clipping_score = 100 *\ np.sum(density_weight_no_central**2) / np.sum(density_weight_full**2) if debug: _plot_clipping_analysis( trace, max_data, density_points, density, density_weight_full, trace_baseline=trace_baseline, density_weight_no_central=density_weight_no_central, clipping_score=clipping_score) return clipping_score
[docs] def check_min_amplitude(trace, min_amplitude_ratio): """ Check if trace amplitude is above a minimum threshold computed from the instrument sensitivity. Parameters ---------- trace : :class:`~obspy.core.trace.Trace` Trace to check. Must have an attached inventory. min_amplitude_fraction : float Minimum trace amplitude, as a fraction of the overall sensitivity of the instrument, to check for clipping. Valid values are between 0 and 1. Returns ------- bool True if trace amplitude is above the minimum threshold, False otherwise. trace_max : float Maximum amplitude of the trace, in counts min_threshold : float Minimum amplitude threshold, in counts """ trace_max = np.max(np.abs(trace.data)) if min_amplitude_ratio is None: return True, trace_max, None if min_amplitude_ratio <= 0: raise ValueError( 'min_amplitude_ratio must be a strictly positive value') if not getattr(trace.stats, 'inventory', None): raise RuntimeError( f'{trace.id}: cannot get instrtype from inventory: ' 'inventory is empty: skipping trace') try: resp = trace.stats.inventory.get_response( trace.id, trace.stats.starttime) except Exception as e: raise RuntimeError( f'{trace.id}: cannot get response from inventory: {e}' ) from e overall_sensitivity = resp.instrument_sensitivity.value min_threshold = overall_sensitivity/min_amplitude_ratio amplitude_test = trace_max > min_threshold return amplitude_test, trace_max, min_threshold
def _get_plotting_axes(): """Get matplotlib axes for plotting""" # pylint: disable=import-outside-toplevel unused-import # Force loading of a matplotlib GUI backend import matplotlib mpl_backends = 'macosx', 'qt5agg', 'qt4agg', 'gtk3agg', 'tkagg', 'wxagg' for backend in mpl_backends: try: matplotlib.use(backend, force=True) from matplotlib import pyplot # noqa break except Exception: continue import matplotlib.pyplot as plt from matplotlib.ticker import ScalarFormatter class ScalarFormatterForceFormat(ScalarFormatter): """ScalarFormatter with forced format""" def _set_format(self, *_args): self.format = '%1.1f' _fig, (ax_trace, ax_density) = plt.subplots( 1, 2, figsize=(15, 5), sharey=True) yfmt = ScalarFormatterForceFormat() yfmt.set_powerlimits((0, 0)) ax_trace.yaxis.set_major_formatter(yfmt) ax_trace.grid(True) ax_trace.set_xlabel('Time (s)') ax_trace.set_ylabel('Amplitude') ax_density.grid(True, axis='y') ax_density.set_xlabel('Density') return plt, ax_trace, ax_density def _plot_clipping_analysis( trace, max_data, density_points, density, density_weight, trace_baseline=None, density_weight_no_central=None, clipping_score=None, peaks=None, num_edge_bins=None, num_kde_bins=None, trace_clipped=False): """ Plot trace, samples histogram and kernel densities (unweighted and weighted) """ plt, ax_trace, ax_density = _get_plotting_axes() # trace plot ax_trace.plot( trace.times(), trace.data, zorder=10, label='baseline removed') if trace_baseline is not None: ax_trace.plot( trace.times(), trace.data + trace_baseline, zorder=5, label='original trace') ax_trace.plot( trace.times(), trace_baseline, zorder=20, label='baseline') ax_trace.legend() ax_trace.set_ylim(-max_data, max_data) ax_trace.set_title(trace.id) # density plots counts, bins, bin_width = _get_histogram(trace.data) ax_density.hist( bins[:-1] + bin_width / 2., bins=len(counts), weights=counts, orientation='horizontal', zorder=10) ax_density.plot(density, density_points, label='kernel density', zorder=20) ax_density.plot( density_weight, density_points, label='weighted kernel density', zorder=30) if density_weight_no_central is not None: ax_density.fill_betweenx( density_points, density_weight_no_central, alpha=0.5, color='gray', label='weighted kernel density\nno central peak', zorder=10) if peaks is not None: ax_density.scatter( density_weight[peaks], density_points[peaks], s=100, marker='x', color='red') ax_density.legend() if num_edge_bins is not None and num_kde_bins is not None: ax_density.axhspan( -max_data, density_points[num_edge_bins], alpha=0.5, color='yellow') ax_density.axhspan( density_points[num_kde_bins - 1 - num_edge_bins], max_data, alpha=0.5, color='yellow') if clipping_score is not None: ax_density.set_title(f'Clipping score: {clipping_score:.2f}%') if trace_clipped: ax_density.set_title('Clipped!') plt.tight_layout() plt.show() def _parse_arguments(): """Parse command line arguments""" # pylint: disable=import-outside-toplevel import sys import argparse description = """\ Check for clipping in traces, based on the kernel density estimation of the trace amplitude values. Two methods are implemented: 1. Check if trace is clipped, based on the number of peaks in the kernel density; 2. Compute a trace clipping score based on the shape of the kernel density. """ parser = argparse.ArgumentParser( description=description, epilog='Use "%(prog)s <command> -h" for help on a specific command', formatter_class=argparse.RawDescriptionHelpFormatter ) subparser = parser.add_subparsers(dest='command') common_parser = argparse.ArgumentParser(add_help=False) common_parser.add_argument( 'infile', nargs='+', help='Input file(s) in any format supported by ' 'ObsPy (e.g., miniSEED, SAC, GSE2, SU, etc.)') common_parser.add_argument( '--traceid', '-t', help='Only process this trace ID. Trace ID can ' 'contain wildcards (e.g., "IU.ANMO.00.BHZ", "IU.ANMO.00.*", ' '"IU.*.00.BHZ", etc.)') common_parser.add_argument( '--remove_baseline', '-r', action='store_true', help='Remove trace baseline before processing', default=False) common_parser.add_argument( '--min-amplitude-ratio', '-m', type=float, default=None, help='Minimum ratio between the overall sensitivity of the ' 'instrument and the maximum amplitude of the trace (both in counts), ' 'to actually perform the clipping check. ' 'Traces with max_amp < (sensitivity / MIN_AMPLITUDE_RATIO) ' 'will not be checked for clipping. ' 'It must be a strictly positive value. Default is None, meaning no ' 'amplitude check is performed. ' 'This option requires providing a station metadata file or dir.') common_parser.add_argument( '--station-metadata', '-w', dest='station_metadata', action='store', default=None, help='get station metadata from FILE (directory or single file ' 'name). Supported format: StationXML, dataless SEED, SEED ' 'RESP, PAZ (SAC polezero format). Station metadata is used to ' 'check clipping against the instrument response.', metavar='FILE') common_parser.add_argument( '--debug', '-d', action='store_true', help='Plot trace, samples histogram, kernel density, and clipping ' 'parameters', default=False) subparser.add_parser( 'clipping_score', help='Get trace clipping score from kernel density ' '(method 2)', parents=[common_parser] ) sp_clipping_peaks = subparser.add_parser( 'clipping_peaks', help='Check if trace is clipped, based on counting ' 'peaks in kernel density (method 1)', parents=[common_parser] ) sp_clipping_peaks.add_argument( '-s', '--sensitivity', type=int, default=3, help='Sensitivity level, from 1 (least sensitive) ' 'to 5 (most sensitive). Default is %(default)s.') sp_clipping_peaks.add_argument( '-p', '--clipping_percentile', type=float, default=10, help='Percentile of trace amplitude range (expressed as percentage) ' 'to check for clipping. Default is %(default)s%%.') args = parser.parse_args() if args.command is None: parser.print_usage(sys.stderr) sys.stderr.write( 'Error: at least one positional argument is required.\n') sys.exit(2) if ( args.min_amplitude_ratio is not None and args.station_metadata is None ): parser.error( '--min-amplitude-ratio requires --station-metadata option') if ( args.min_amplitude_ratio is not None and args.min_amplitude_ratio <= 0 ): parser.error( '--min-amplitude-ratio must be a strictly positive value') return args def _add_inventory(trace, inventory): """Add inventory to trace.""" if not inventory: return net, sta, loc, chan = trace.id.split('.') inv = inventory.select( network=net, station=sta, location=loc, channel=chan) if inv is None: raise RuntimeError( f'{trace.id}: cannot get instrtype from inventory: ' 'inventory is empty: skipping trace') trace.stats.inventory = inv def _run_amplitude_check(trace, args): """ Run amplitude check method. Raise exception if amplitude is below minimum threshold. """ _add_inventory(trace, args.inv) min_ampl_test, tr_max, min_thresh = check_min_amplitude( trace, args.min_amplitude_ratio) if not min_ampl_test: raise RuntimeError( f'{trace.id}: max amplitude ({tr_max:.1f}) below minimum ' f'threshold ({min_thresh:.1f}): skipping clipping check' ) # ainsi codes for fancy output RESET = "\u001b[0m" RED = "\u001b[31m" GREEN = "\u001b[32m" YELLOW = "\u001b[33m" def _run_clipping_peaks(trace, args): """Run clipping peaks method and print results""" trace_clipped, properties = clipping_peaks( trace, args.sensitivity, args.clipping_percentile, args.debug) msg = ( f'{trace.id}: ' f'total peaks: {properties["npeaks"]}, ' f'clipped peaks: {properties["npeaks_clipped"]}' ) if trace_clipped: msg += f' - {RED}clipped!{RESET}' print(msg) def _run_clipping_score(trace, args): """Run clipping score method and print results""" score = compute_clipping_score( trace, args.remove_baseline, args.debug) if score < 10: color = GREEN elif score < 20: color = YELLOW else: color = RED print(f'{trace.id}: clipping score: {color}{score:.2f}%{RESET}') def _command_line_interface(): """Command line interface""" # pylint: disable=import-outside-toplevel from obspy import read, Stream from sourcespec.ssp_read_station_metadata import read_station_metadata args = _parse_arguments() st = Stream() for file in args.infile: try: st += read(file) except Exception as msg: print(f'Error reading file {file}: {msg}') if args.traceid is not None: st = st.select(id=args.traceid) if not st: print('No traces found') return args.inv = read_station_metadata(args.station_metadata) for tr in st: try: _run_amplitude_check(tr, args) except RuntimeError as msg: print(msg) continue if args.command == 'clipping_peaks': _run_clipping_peaks(tr, args) elif args.command == 'clipping_score': _run_clipping_score(tr, args)
[docs] def main(): """Main function""" # pylint: disable=import-outside-toplevel import sys try: _command_line_interface() except Exception as msg: sys.exit(msg) except KeyboardInterrupt: sys.exit()
if __name__ == '__main__': main()