Source code for spectrum

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Define Spectrum and SpectrumStream classes,
similar to ObsPy's Trace and Stream.

Provides the high-level function read_spectra() to read
SpectrumStream objects from HDF5 or TEXT files.

:copyright:
    2012-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import os
import glob
import copy
import fnmatch
import warnings
import math
import h5py
import yaml
import numpy as np


[docs] def signal_fft(signal, delta): """ Compute the complex Fourier transform of a signal. :param signal: The signal to transform. :param delta: The sampling interval. :return: The Fourier transform and the frequency axis. """ npts = len(signal) # if npts is even, we make it odd # so that we do not have a negative frequency in the last point # (see numpy.fft.rfft doc) if not npts % 2: npts -= 1 # note that fft has the dimensions of the signal multiplied by time (delta) fft = np.fft.rfft(signal, n=npts) * delta fftfreq = np.fft.fftfreq(len(signal), d=delta) fftfreq = fftfreq[:fft.size] return fft, fftfreq
[docs] class AttributeDict(dict): """A dictionary that allows attribute-style access.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __getattr__(self, name): try: return self.__getitem__(name) except KeyError as e: raise AttributeError( f"'{self.__class__.__name__}' object has no attribute " f"'{name}'" ) from e def __setattr__(self, name, value): self[name] = value def __dir__(self): return self.keys() def __copy__(self): new_dict = AttributeDict() for key, value in self.items(): new_dict[key] = value return new_dict def __deepcopy__(self, memo): new_dict = AttributeDict() for key, value in self.items(): new_dict[key] = copy.copy(value) return new_dict
[docs] class Spectrum(): """ A class to handle amplitude spectra. :param obspy_trace: An ObsPy Trace object to compute the spectrum from. """ def __init__(self, obspy_trace=None): """ Initialize the Spectrum object. :param obspy_trace: An ObsPy Trace object to compute the spectrum from. """ self._data = np.array([], dtype=float) self._data_logspaced = np.array([], dtype=float) self._data_mag = np.array([], dtype=float) self._data_mag_logspaced = np.array([], dtype=float) self._freq = np.array([], dtype=float) self._freq_logspaced = np.array([], dtype=float) self.stats = AttributeDict() self.stats.delta = 1. self.stats.npts = 0 self.stats.delta_logspaced = 1. self.stats.npts_logspaced = 0 self.stats.station = '' self.stats.network = '' self.stats.location = '' self.stats.channel = '' if obspy_trace is not None: self.from_obspy_trace(obspy_trace) def __str__(self): delta = self.stats.delta ndigits = _n_significant_digits(delta) delta_str = f'{delta:.{ndigits}f}' fmin_str = f'{self.freq[0]:.{ndigits}f}' if self.freq.size else '...' fmax_str = f'{self.freq[-1]:.{ndigits}f}' if self.freq.size else '...' delta_logspaced = self.stats.delta_logspaced ndigits = _n_significant_digits(delta_logspaced) delta_logspaced_str = f'{delta_logspaced:.{ndigits}f}' fmin_logspaced_str =\ f'{self.freq_logspaced[0]:.{ndigits}f}'\ if self.freq_logspaced.size else '...' fmax_logspaced_str =\ f'{self.freq_logspaced[-1]:.{ndigits}f}'\ if self.freq_logspaced.size else '...' return ( f'{self.id} | ' f'{self.stats.npts} samples, {fmin_str}-{fmax_str} Hz | ' f'{delta_str} Hz sample interval | ' f'{self.stats.npts_logspaced} samples logspaced, ' f'{fmin_logspaced_str}-{fmax_logspaced_str} Hz | ' f'{delta_logspaced_str} log10([Hz]) sample interval logspaced ' ) def __repr__(self): return f'Spectrum {self}' def __gt__(self, other): return self.id > other.id def __lt__(self, other): return self.id < other.id def __ge__(self, other): return self.id >= other.id def __le__(self, other): return self.id <= other.id @property def id(self): """Return the id of the spectrum.""" return ( f'{self.stats.network}.{self.stats.station}.' f'{self.stats.location}.{self.stats.channel}' ) @id.setter def id(self, value): """Set the id of the spectrum.""" try: net, sta, loc, cha = value.split('.') except ValueError as e: raise ValueError(f'Not a valid SEED id: {value}') from e self.stats.network = net self.stats.station = sta self.stats.location = loc self.stats.channel = cha
[docs] def get_id(self): """Return the id of the spectrum.""" return self.id
@property def data(self): """Return the array containing the amplitude spectrum.""" return self._data @data.setter def data(self, value): """Set the array containing the amplitude spectrum.""" if not isinstance(value, np.ndarray): value = np.array(value) self._data = value self.stats.npts = len(value) @property def data_mag(self): """ Return the array containing the amplitude spectrum in mangitude units. """ return self._data_mag @data_mag.setter def data_mag(self, value): """ Set the array containing the amplitude spectrum in magnitude units. """ if not isinstance(value, np.ndarray): value = np.array(value) if len(value) > 0 and len(value) != len(self.data): raise ValueError('data_mag must have the same length as data') self._data_mag = value @property def data_logspaced(self): """ Return the array containing the amplitude spectrum in logspaced frequencies. """ return self._data_logspaced @data_logspaced.setter def data_logspaced(self, value): """ Set the array containing the amplitude spectrum in logspaced frequencies. """ if not isinstance(value, np.ndarray): value = np.array(value) self._data_logspaced = value self.stats.npts_logspaced = len(value) @property def data_mag_logspaced(self): """ Return the array containing the amplitude spectrum in logspaced frequencies in magnitude units. """ return self._data_mag_logspaced @data_mag_logspaced.setter def data_mag_logspaced(self, value): """ Set the array containing the amplitude spectrum in logspaced frequencies in magnitude units. """ if not isinstance(value, np.ndarray): value = np.array(value) if len(value) > 0 and len(value) != len(self.data_logspaced): raise ValueError( 'data_mag_logspaced must have the same length as ' 'data_logspaced' ) self._data_mag_logspaced = value @property def freq(self): """Return the frequency axis of the spectrum.""" return self._freq @freq.setter def freq(self, value): """Set the frequency axis of the spectrum.""" if not isinstance(value, np.ndarray): value = np.array(value) if len(value) > 0: delta = np.diff(value) if not np.isclose(delta, delta[0], rtol=0.01).all(): raise ValueError('Frequency axis must be evenly spaced') self.stats.delta = float(delta[0]) else: self.stats.delta = 1. self._freq = value @property def freq_logspaced(self): """Return the logspaced frequency axis of the spectrum.""" return self._freq_logspaced @freq_logspaced.setter def freq_logspaced(self, value): """Set the logspaced frequency axis of the spectrum.""" if not isinstance(value, np.ndarray): value = np.array(value) if len(value) > 0: delta_logspaced = np.diff(np.log10(value)) if not np.isclose( delta_logspaced, delta_logspaced[0], rtol=0.01 ).all(): raise ValueError( 'Logspaced frequency axis must be evenly spaced') self.stats.delta_logspaced = float(delta_logspaced[0]) else: self.stats.delta_logspaced = 1. self._freq_logspaced = value
[docs] def copy(self): """Return a copy of the spectrum.""" spec_copy = Spectrum() spec_copy.stats = AttributeDict(self.stats) spec_copy.data = self.data.copy() spec_copy.data_mag = self.data_mag.copy() spec_copy.data_logspaced = self.data_logspaced.copy() spec_copy.data_mag_logspaced = self.data_mag_logspaced.copy() spec_copy.freq = self.freq.copy() spec_copy.freq_logspaced = self.freq_logspaced.copy() return spec_copy
[docs] def slice(self, fmin, fmax, nearest_sample=True, pad=False, fill_value=None): """ Slice the spectrum between fmin and fmax. :param fmin: Minimum frequency. :param fmax: Maximum frequency. :param nearest_sample: If True, the slice will include the nearest frequency to fmin and fmax. :param pad: If True, the slice will be padded with the value of fill_value until fmin and fmax are included. :param fill_value: The value to use for padding. :note: Only the linear spaced frequencies, data and data_mag are sliced. If the original spectrum contains logspaced frequencies, data, and data_mag, those are not preserved in the sliced spectrum. """ freq = self.freq slice_condition = (freq >= fmin) & (freq <= fmax) if nearest_sample: # find the closest frequency to fmin: idx = (np.abs(freq - fmin)).argmin() slice_condition[idx] = True # find the closest frequency to fmax: idx = (np.abs(freq - fmax)).argmin() slice_condition[idx] = True freqs_slice = freq[slice_condition] data_slice = self.data[slice_condition] data_mag_slice =\ self.data_mag[slice_condition] if self.data_mag.size\ else np.array([]) if pad: # add values to the slice until fmin and fmax are included while freqs_slice[0] > fmin: freqs_slice = np.insert( freqs_slice, 0, freqs_slice[0] - self.stats.delta) data_slice = np.insert(data_slice, 0, fill_value) if data_mag_slice.size: data_mag_slice = np.insert(data_mag_slice, 0, fill_value) while freqs_slice[-1] < fmax: freqs_slice = np.append( freqs_slice, freqs_slice[-1] + self.stats.delta) data_slice = np.append(data_slice, fill_value) if data_mag_slice.size: data_mag_slice = np.append(data_mag_slice, fill_value) spec_slice = Spectrum() spec_slice.stats = AttributeDict(self.stats) spec_slice.data = data_slice spec_slice.data_mag = data_mag_slice spec_slice.freq = freqs_slice return spec_slice
[docs] def plot(self, **kwargs): """Plot the amplitude spectrum.""" # pylint: disable=import-outside-toplevel import matplotlib.pyplot as plt plt.loglog(self.freq, self.data, **kwargs) plt.grid(True) plt.xlabel('Frequency (Hz)') plt.ylabel('Amplitude') plt.show()
[docs] def from_obspy_trace(self, trace): """Compute the spectrum from an ObsPy Trace object.""" # pylint: disable=import-outside-toplevel from obspy.core.trace import Trace if not isinstance(trace, Trace): raise TypeError('Only ObsPy Trace objects are supported') signal = trace.data delta = trace.stats.delta amp, freq = signal_fft(signal, delta) # remove DC component (freq=0) self.data = np.abs(amp)[1:] self.freq = freq[1:] # copy the trace metadata self.stats.station = trace.stats.station self.stats.network = trace.stats.network self.stats.location = trace.stats.location self.stats.channel = trace.stats.channel
def _write_to_hdf5_group(self, group): """ Write the spectrum to an HDF5 spectrum group. :param group: The HDF5 group to write to. """ stats = _normalize_metadata_object(self.stats) for attr, value in stats.items(): # convert dictionaries to strings using YAML if hasattr(value, 'items'): value = yaml.dump( value, Dumper=_HDF5HeaderDumper, default_flow_style=True ).replace('\n', '') # if value is a list-like, # check if all elements are of the same type elif hasattr(value, '__iter__') and len(value) > 0: type0 = type(value[0]) if not all(isinstance(v, type0) for v in value): raise ValueError( f'All values of attribute "{attr}" must be of the ' 'same type' ) # ignore unsupported types elif not isinstance(value, (int, float, bool, str)): warnings.warn( f'Attribute "{attr}" is not a supported type ' f'({type(value)}) and will be ignored' ) continue group.attrs[attr] = value group.create_dataset('freq', data=self.freq) group.create_dataset('data', data=self.data) group.create_dataset('data_mag', data=self.data_mag) group.create_dataset('freq_logspaced', data=self.freq_logspaced) group.create_dataset('data_logspaced', data=self.data_logspaced) group.create_dataset( 'data_mag_logspaced', data=self.data_mag_logspaced) def _write_hdf5(self, filename, append=False): """ Write the spectrum to an HDF5 file. :param append: If True, append the spectrum to an existing file. :param filename: The name of the file to write to. """ if append: fp = h5py.File(filename, 'a') else: fp = h5py.File(filename, 'w') fp.attrs['format'] = 'SourceSpec HDF5' fp.attrs['version'] = '1.0' fp.attrs['url'] = 'https://sourcespec.seismicsource.org' main_group = fp['spectra'] if 'spectra' in fp\ else fp.create_group('spectra') all_spec_group_names = [ key for key in main_group.keys() if key.startswith('spectrum_') ] if not all_spec_group_names: last_group_number = -1 else: last_group = sorted(all_spec_group_names)[-1] last_group_number = int(last_group.split('_')[1]) nn = last_group_number + 1 spec_group_name = f'spectrum_{nn:05d}_{self.id}' self._write_to_hdf5_group(main_group.create_group(spec_group_name)) fp.close() def _write_text(self, filename, append=False): """ Write the spectrum to a TEXT file. :param filename: The name of the file to write to. """ if append: raise ValueError('Cannot append to a TEXT file') with open(filename, 'w', encoding='utf-8') as fp: fp.write('# %SOURCESPEC TEXT SPECTRUM FORMAT 1.0\n') fp.write('# %BEGIN STATS YAML\n') stats = _normalize_metadata_object(self.stats) stats_str = yaml.safe_dump( stats, sort_keys=False ).rstrip() for line in stats_str.split('\n'): fp.write(f'# {line}\n') fp.write( '# %END STATS YAML\n' '# %BEGIN LINSPACED DATA\n' '# frequency(Hz) data data_mag\n' ) if self.data_mag.size: data_mag = self.data_mag else: data_mag = np.ones_like(self.data) * np.nan for freq, data, data_mag in zip(self.freq, self.data, data_mag): fp.write(f'{freq:.6f} {data:.6f} {data_mag:.6f}\n') fp.write( '# %END LINSPACED DATA\n' '# %BEGIN LOGSPACED DATA\n' '# frequency_logspaced(Hz) data_logspaced data_mag_logspaced\n' ) if self.data_mag_logspaced.size: data_mag_logspaced = self.data_mag_logspaced else: data_mag_logspaced = np.ones_like(self.data_logspaced) * np.nan for freq_logspaced, data_logspaced, data_mag_logspaced in zip( self.freq_logspaced, self.data_logspaced, data_mag_logspaced): fp.write( f'{freq_logspaced:.6f} {data_logspaced:.6f} ' f'{data_mag_logspaced:.6f}\n' ) fp.write('# %END LOGSPACED DATA\n') # pylint: disable=redefined-builtin
[docs] def write(self, filename, format='HDF5', append=False): """ Write the spectrum to a file. :param filename: The name of the file to write to. :param format: The format to use. One of 'HDF5' or 'TEXT'. Default is 'HDF5'. :param append: If True, append the spectrum to an existing file. Only valid for HDF5 format. """ if format == 'HDF5': self._write_hdf5(filename, append) elif format == 'TEXT': self._write_text(filename, append) else: raise ValueError(f'Unsupported format: {format}')
[docs] class SpectrumStream(list): """ A class to handle a collection of amplitude spectra. """ def __str__(self): return ( f'SpectrumStream with {len(self)} Spectrum objects:\n' + '\n'.join(f'{s}' for s in self) ) def __repr__(self): return self.__str__()
[docs] def sort(self, reverse=False): """Sort the SpectrumStream in place.""" super().sort(reverse=reverse)
[docs] def append(self, spectrum): """Append a spectrum to the collection.""" if not isinstance(spectrum, Spectrum): raise TypeError('Only Spectrum objects can be appended') super().append(spectrum)
[docs] def select(self, **kwargs): """Select a subset of the SpectrumStream.""" selected = SpectrumStream() for spectrum in self: for key, value in kwargs.items(): if key == 'id': stored_value = spectrum.get_id() else: stored_value = getattr(spectrum.stats, key) if not fnmatch.fnmatch(stored_value, value): break else: selected.append(spectrum) return selected
# pylint: disable=redefined-builtin
[docs] def write(self, filename, format='HDF5'): """ Write the SpectrumStream to a file. :param filename: The name of the file to write to. :param format: The format to use. One of 'HDF5' or 'TEXT'. """ if format == 'HDF5': append = False for spectrum in self: spectrum.write(filename, format, append) append = True elif format == 'TEXT': if len(self) == 1: self[0].write(filename, format) else: for n, spectrum in enumerate(self): _root, _ext = os.path.splitext(filename) _filename = f'{_root}_{n:04d}{_ext}' spectrum.write(_filename, format) else: raise ValueError(f'Unsupported format: {format}')
# ---- Reading/writing functions and helper functions ---- def _n_significant_digits(x): """ Helper function to compute the number of significant digits of a number. - If the number is greater than 1, the number of significant digits is zero. - If the number is less than 1, the number of significant digits is the number of digits after the decimal point. - If the number is zero, the number of significant digits is zero. """ try: x = math.fabs(x) except TypeError as e: raise ValueError('x must be a number') from e return 0 if x == 0 or x > 1 else -int(math.floor(math.log10(x))) class _HDF5HeaderDumper(yaml.SafeDumper): """ A YAML dumper used for writing the HDF5 header. """ def _default_yaml_representer(dumper, data): """ Default YAML representer for unsupported types. :param dumper: The YAML dumper. :param data: The data to represent. :return: The YAML representation of the data. """ return dumper.represent_scalar('tag:yaml.org,2002:str', str(data)) def _quoted_representer(dumper, data): """ YAML representer for strings, with quotes. :param dumper: The YAML dumper. :param data: The data to represent. :return: The YAML representation of the data. """ return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="'") def _dict_constructor(loader, node): """ YAML constructor for dictionaries. :param loader: The YAML loader. :param node: The node to construct. :return: The dictionary constructed from the node. """ return AttributeDict(loader.construct_mapping(node)) # register the representers yaml.representer.SafeRepresenter.add_representer( None, _default_yaml_representer) _HDF5HeaderDumper.add_representer(str, _quoted_representer) _HDF5HeaderDumper.add_representer(None, _default_yaml_representer) # register the constructor yaml.SafeLoader.add_constructor('tag:yaml.org,2002:map', _dict_constructor) def _normalize_metadata_object(obj): """ Normalize a metadata object to use: - dictionaries instead of custom objects; - standard floats instead of numpy floats; - standard booleans instead of numpy booleans. All the other types are left unchanged. :param obj: The object to normalize. :return: A dictionary, a float, or the original value. """ if hasattr(obj, 'items'): return { key: _normalize_metadata_object(val) for key, val in obj.items() } if isinstance(obj, (np.float64, np.float32)): obj = float(obj) if isinstance(obj, np.bool_): obj = bool(obj) return obj def _read_spectrum_from_hdf5_group(group): """ Read a Spectrum object from an HDF5 group. :param group: The HDF5 group to read from. :return: The Spectrum object. """ spectrum = Spectrum() for attr, value in group.attrs.items(): # convert strings back to dictionaries, using YAML if ( isinstance(value, str) and value.startswith('{') and value.endswith('}') ): try: value = yaml.safe_load(value) except yaml.YAMLError: warnings.warn( f'Attribute "{attr}" is not a supported type and will be ' 'ignored' ) spectrum.stats[attr] = value spectrum.freq = group['freq'] spectrum.data = group['data'] spectrum.data_mag = group['data_mag'] spectrum.freq_logspaced = group['freq_logspaced'] spectrum.data_logspaced = group['data_logspaced'] spectrum.data_mag_logspaced = group['data_mag_logspaced'] return spectrum def _read_stats_from_text_lines(lines): """ Read the stats block from a TEXT file. :param lines: The lines to read from. :return: The stats block. """ _stats_lines = [] _stats_block = False for line in lines: if line.startswith('# %BEGIN STATS YAML'): _stats_block = True elif line.startswith('# %END STATS YAML'): _stats_block = False break elif _stats_block: _stats_lines.append(line[2:]) return yaml.safe_load(''.join(_stats_lines)) def _read_data_block_from_text_lines(lines, start_string, end_string): """ Read a data block from a TEXT file. :param lines: The lines to read from. :param start_string: The string marking the start of the data block. :param end_string: The string marking the end of the data block. :return: The data block. """ _data_lines = [] _data_block = False for line in lines: if line.startswith(start_string): _data_block = True elif line.startswith(end_string): _data_block = False break elif _data_block: if line.startswith('#'): continue _data_lines.append(line.split()) freq, data, data_mag = np.array(_data_lines, dtype=float).T return freq, data, data_mag def _read_spectrum_from_text_file(filename): """ Read a Spectrum object from a TEXT file. :param filename: The name of the file to read from. :return: The Spectrum object. """ with open(filename, 'r', encoding='utf-8') as fp: lines = fp.readlines() stats = _read_stats_from_text_lines(lines) freq, data, data_mag = _read_data_block_from_text_lines( lines, '# %BEGIN LINSPACED DATA', '# %END LINSPACED DATA' ) freq_logspaced, data_logspaced, data_mag_logspaced =\ _read_data_block_from_text_lines( lines, '# %BEGIN LOGSPACED DATA', '# %END LOGSPACED DATA' ) spectrum = Spectrum() spectrum.stats = AttributeDict(stats) spectrum.freq = freq spectrum.data = data if not np.isnan(data_mag).all(): spectrum.data_mag = data_mag spectrum.freq_logspaced = freq_logspaced spectrum.data_logspaced = data_logspaced if not np.isnan(data_mag_logspaced).all(): spectrum.data_mag_logspaced = data_mag_logspaced return spectrum # pylint: disable=redefined-builtin
[docs] def read_spectra(pathname, format='HDF5'): """ Read a SpectrumStream from one ore more files. :param pathname: The pathname of the files to read. Can contain wildcards. :param format: The format to use. One of 'HDF5' or 'TEXT'. Default is 'HDF5'. :return: The SpectrumStream object. """ filelist = glob.glob(pathname) if not filelist: raise FileNotFoundError(f'No files found matching "{pathname}"') spectra = SpectrumStream() for fname in filelist: if format == 'HDF5': with h5py.File(fname, 'r') as fp: try: main_group = fp['spectra'] except KeyError as e: raise ValueError('No spectra found in HDF5 file') from e for group in main_group.values(): spectra.append(_read_spectrum_from_hdf5_group(group)) elif format == 'TEXT': spectra.append(_read_spectrum_from_text_file(fname)) else: raise ValueError(f'Unsupported format: {format}') return spectra