Source code for ssp_read_traces

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Read traces in multiple formats of data and metadata.

:copyright:
    2012 Claudio Satriano <satriano@ipgp.fr>

    2013-2014 Claudio Satriano <satriano@ipgp.fr>,
              Emanuela Matrullo <matrullo@geologie.ens.fr>

    2015-2026 Claudio Satriano <satriano@ipgp.fr>,
              Sophie Lambotte <sophie.lambotte@unistra.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import sys
import os
import logging
import shutil
import tarfile
import zipfile
import tempfile
import contextlib
import warnings
from obspy import read
from obspy.core import Stream
from obspy.core.util import AttribDict
from sourcespec.ssp_setup import (
    ssp_exit, INSTR_CODES_VEL, INSTR_CODES_ACC, TRACEID_MAP)
from sourcespec.ssp_util import MediumProperties
from sourcespec.ssp_read_station_metadata import (
    read_station_metadata, PAZ)
from sourcespec.ssp_read_event_metadata import (
    parse_qml, parse_hypo_file, override_event_depth, parse_hypo71_picks)
from sourcespec.ssp_read_sac_header import (
    is_SAC_trace,
    compute_sensitivity_from_SAC,
    get_instrument_from_SAC,
    get_station_coordinates_from_SAC,
    get_event_from_SAC,
    get_picks_from_SAC)
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
# Silence specific ObsPy SAC sample spacing warning (see obspy#3408)
warnings.filterwarnings(
    'ignore',
    message=r'.*Sample spacing read from SAC file',
    category=UserWarning,
    module=r'obspy\.io\.sac\.util'
)


# TRACE MANIPULATION ----------------------------------------------------------
def _correct_traceid(trace):
    if TRACEID_MAP is None:
        return
    with contextlib.suppress(KeyError):
        traceid = TRACEID_MAP[trace.get_id()]
        net, sta, loc, chan = traceid.split('.')
        trace.stats.network = net
        trace.stats.station = sta
        trace.stats.location = loc
        trace.stats.channel = chan


def _add_instrtype(trace):
    """Add instrtype to trace."""
    instrtype = None
    band_code = None
    instr_code = None
    trace.stats.instrtype = None
    # First, try to get the instrtype from channel name
    chan = trace.stats.channel
    if len(chan) > 2:
        band_code = chan[0]
        instr_code = chan[1]
    if instr_code in INSTR_CODES_VEL:
        # SEED standard band codes from higher to lower sampling rate
        # https://ds.iris.edu/ds/nodes/dmc/data/formats/seed-channel-naming/
        if band_code in ['G', 'D', 'E', 'S']:
            instrtype = 'shortp'
        if band_code in ['F', 'C', 'H', 'B']:
            instrtype = 'broadb'
    if instr_code in INSTR_CODES_ACC:
        instrtype = 'acc'
    if instrtype is None and is_SAC_trace(trace):
        # Let's see if there is an instrument name in SAC header (ISNet format)
        # In this case, we define band and instrument codes a posteriori
        instrtype, band_code, instr_code = get_instrument_from_SAC(trace)
        orientation = trace.stats.channel[-1]
        trace.stats.channel = ''.join((band_code, instr_code, orientation))
    trace.stats.instrtype = instrtype
    trace.stats.info = f'{trace.id} {trace.stats.instrtype}'


def _add_inventory(trace, inventory, config):
    """Add inventory to trace."""
    net, sta, loc, chan = trace.id.split('.')
    inv = (
        inventory.select(
            network=net, station=sta, location=loc, channel=chan)
        or inventory.select(
            network='XX', station='GENERIC', location='XX', channel='XXX')
    )
    if 'XX.GENERIC.XX.XXX' in inv.get_contents()['channels']:
        inv = inv.copy()
        inv.networks[0].code = net
        inv.networks[0].stations[0].code = sta
        inv.networks[0].stations[0].channels[0].code = chan
        inv.networks[0].stations[0].channels[0].location_code = loc
    # If a "sensitivity" config option is provided, override the Inventory
    # object with a new one constructed from the sensitivity value
    if config.sensitivity is not None:
        # save coordinates from the inventory, if available
        coords = None
        with contextlib.suppress(Exception):
            coords = inv.get_coordinates(trace.id, trace.stats.starttime)
        paz = PAZ()
        paz.seedID = trace.id
        try:
            # try to convert sensitivity to float
            paz.sensitivity = float(config.sensitivity)
        except ValueError as e:
            if not is_SAC_trace(trace):
                raise RuntimeError(
                    f'{trace.id}: cannot compute sensitivity from SAC header: '
                    f'sensitivity "{config.sensitivity}". '
                    'The trace is not in SAC format : skipping trace'
                ) from e
            # if it fails, try to evaluate it as a string containing
            # a combination of SAC header fields
            paz.sensitivity = compute_sensitivity_from_SAC(trace, config)
        paz.poles = []
        paz.zeros = []
        if inv:
            logger.warning(
                f'Overriding response for {trace.id} with constant '
                f'sensitivity {paz.sensitivity}')
        inv = paz.to_inventory()
        # restore coordinates, if available
        if coords is not None:
            chan = inv.networks[0].stations[0].channels[0]
            chan.latitude = coords['latitude']
            chan.longitude = coords['longitude']
            chan.elevation = coords['elevation']
            chan.depth = coords['local_depth']
    trace.stats.inventory = inv


def _check_instrtype(trace):
    """Check if instrument type is consistent with units in inventory."""
    inv = trace.stats.inventory
    if not inv:
        raise RuntimeError(
            f'{trace.id}: cannot get instrtype from inventory: '
            'inventory is empty: skipping trace')
    instrtype = trace.stats.instrtype
    new_instrtype = None
    try:
        units = inv.get_response(trace.id, trace.stats.starttime).\
            instrument_sensitivity.input_units
    except Exception as e:
        # inventory attached to trace has only one channel
        chan = inv[0][0][0]
        start_date = chan.start_date
        end_date = chan.end_date
        raise RuntimeError(
            f'{trace.id}: cannot get units from inventory.\n'
            f'> {e.__class__.__name__}: {e}\n'
            f'> Channel start/end date: {start_date} {end_date}\n'
            f'> Trace start time: {trace.stats.starttime}\n'
            '> Skipping trace'
        ) from e
    trace.stats.units = units
    if units.lower() == 'm' and trace.stats.instrtype != 'disp':
        new_instrtype = 'disp'
    if units.lower() == 'm/s' and instrtype not in ['shortp', 'broadb']:
        new_instrtype = 'broadb'
    if units.lower() == 'm/s**2' and instrtype != 'acc':
        new_instrtype = 'acc'
    if new_instrtype is not None:
        logger.warning(
            f'{trace.id}: instrument response units are "{units}" but '
            f'instrument type is "{instrtype}". Changing instrument type '
            f'to "{new_instrtype}"')
        trace.stats.instrtype = new_instrtype


def _add_coords(trace):
    """Add coordinates to trace."""
    # If we already know that traceid is skipped, raise a silent exception
    if trace.id in _add_coords.skipped:
        raise RuntimeError()
    coords = None
    with contextlib.suppress(Exception):
        # Inventory.get_coordinates() raises a generic Exception
        # if coordinates are not found
        coords = trace.stats.inventory.get_coordinates(
                    trace.id, trace.stats.starttime)
    if coords is not None:
        # Build an AttribDict and make sure that coordinates are floats
        coords = AttribDict({
            key: float(value) for key, value in coords.items()})
        coords = (
            None if (
                coords.longitude == 0 and coords.latitude == 0 and
                coords.local_depth == 123456 and coords.elevation == 123456)
            else coords)
    if coords is None and is_SAC_trace(trace):
        # If we still don't have trace coordinates,
        # we try to get them from SAC header
        coords = get_station_coordinates_from_SAC(trace)
    if coords is None:
        # Give up!
        _add_coords.skipped.append(trace.id)
        raise RuntimeError(
            f'{trace.id}: could not find coords for trace: skipping trace')
    if coords.latitude == coords.longitude == 0:
        logger.warning(
            f'{trace.id}: trace has latitude and longitude equal to zero!')
    # elevation is in meters in StationXML or SAC header
    coords.elevation /= 1e3
    trace.stats.coords = coords
# list to keep track of skipped traces
_add_coords.skipped = []  # noqa


def _add_event(trace, ssp_event=None, depth_override=None):
    """Add ssp_event object to trace."""
    if ssp_event is None:
        # Try to get hypocenter information from the SAC header
        try:
            ssp_event = get_event_from_SAC(trace)
            logger.info(f'{trace.id}: event info read from SAC header')
            override_event_depth(ssp_event, depth_override)
        except Exception:
            return
    trace.stats.event = ssp_event


def _add_picks(trace, picks=None):
    """Add picks to trace."""
    if picks is None:
        picks = []
    trace_picks = []
    with contextlib.suppress(Exception):
        trace_picks = get_picks_from_SAC(trace)
    for pick in picks:
        if pick.station == trace.stats.station:
            trace_picks.append(pick)
    trace.stats.picks = trace_picks
    # Create empty dicts for arrivals, travel_times and takeoff angles.
    # They will be used later.
    trace.stats.arrivals = {}
    trace.stats.travel_times = {}
    trace.stats.takeoff_angles = {}


def _complete_picks(st):
    """Add component-specific picks to all components."""
    for station in {tr.stats.station for tr in st}:
        st_sel = st.select(station=station)
        # 'code' is band+instrument code
        for code in {tr.stats.channel[:-1] for tr in st_sel}:
            st_sel2 = st_sel.select(channel=f'{code}?')
            all_picks = [p for tr in st_sel2 for p in tr.stats.picks]
            all_P_picks = [p for p in all_picks if p.phase == 'P']
            all_S_picks = [p for p in all_picks if p.phase == 'S']
            # Select default P and S picks as the first in list (or empty list)
            default_P_pick = all_P_picks[:1]
            default_S_pick = all_S_picks[:1]
            for tr in st_sel2:
                # Attribute default picks to components without picks
                if not [p for p in tr.stats.picks if p.phase == 'P']:
                    tr.stats.picks += default_P_pick
                if not [p for p in tr.stats.picks if p.phase == 'S']:
                    tr.stats.picks += default_S_pick
# -----------------------------------------------------------------------------


# FILE PARSING ----------------------------------------------------------------
def _hypo_vel(hypo, config):
    medium_properties = MediumProperties(
        hypo.longitude.value_in_deg, hypo.latitude.value_in_deg,
        hypo.depth.value_in_km, config
    )
    hypo.vp = medium_properties.get(mproperty='vp', where='source')
    hypo.vs = medium_properties.get(mproperty='vs', where='source')
    hypo.rho = medium_properties.get(mproperty='rho', where='source')
    depth_string = medium_properties.to_string(
        'source depth', hypo.depth.value_in_km)
    vp_string = medium_properties.to_string('vp_source', hypo.vp)
    vs_string = medium_properties.to_string('vs_source', hypo.vs)
    rho_string = medium_properties.to_string('rho_source', hypo.rho)
    logger.info(f'{depth_string}, {vp_string}, {vs_string}, {rho_string}')


def _build_filelist(path, filelist, tmpdir):
    if os.path.isdir(path):
        listing = os.listdir(path)
        for filename in listing:
            fullpath = os.path.join(path, filename)
            _build_filelist(fullpath, filelist, tmpdir)
    else:
        try:
            # pylint: disable=unspecified-encoding consider-using-with
            open(path)
        except IOError as err:
            logger.error(err)
            return
        if tarfile.is_tarfile(path) and tmpdir is not None:
            try:
                tar = tarfile.open(path)
            except tarfile.ReadError:
                # is_tarfile() gave a false positive, just append the file
                # to the file list and let ObsPy handle it
                filelist.append(path)
                return
            try:
                tar.extractall(path=tmpdir)
            except Exception as msg:
                logger.warning(
                    f'{path}: Unable to fully extract tar archive: {msg}')
            tar.close()
        elif zipfile.is_zipfile(path) and tmpdir is not None:
            try:
                zipf = zipfile.ZipFile(path)
            except zipfile.BadZipFile:
                # is_zipfile() gave a false positive, just append the file
                # to the file list and let ObsPy handle it
                filelist.append(path)
                return
            try:
                zipf.extractall(path=tmpdir)
            except Exception as msg:
                logger.warning(
                    f'{path}: Unable to fully extract zip archive: {msg}')
            zipf.close()
        else:
            filelist.append(path)


def _read_trace_files(
        config, inventory, ssp_event, picks, depth_override=None):
    """
    Read trace files from a given path. Complete trace metadata and
    return a stream object.
    """
    # phase 1: build a file list
    # ph 1.1: create a temporary dir and run '_build_filelist()'
    #         to move files to it and extract all tar archives
    tmpdir = tempfile.mkdtemp()
    filelist = []
    for trace_path in config.options.trace_path:
        _build_filelist(trace_path, filelist, tmpdir)
    # ph 1.2: rerun '_build_filelist()' in tmpdir to add to the
    #         filelist all the extraceted files
    listing = os.listdir(tmpdir)
    for filename in listing:
        fullpath = os.path.join(tmpdir, filename)
        _build_filelist(fullpath, filelist, None)
    # phase 2: build a stream object from the file list
    orientation_codes = config.vertical_channel_codes +\
        config.horizontal_channel_codes_1 +\
        config.horizontal_channel_codes_2
    st = Stream()
    for filename in sorted(filelist):
        try:
            tmpst = read(filename, fsize=False)
        except Exception:
            logger.warning(
                f'{filename}: Unable to read file as a trace: skipping')
            continue
        for trace in tmpst.traces:
            try:
                orientation = trace.stats.channel[-1]
            except IndexError:
                logger.warning(
                    f'{trace.id}: trace has no channel information: '
                    f'skipping trace'
                )
                continue
            if orientation not in orientation_codes:
                logger.warning(
                    f'{trace.id}: Unknown channel orientation: '
                    f'"{orientation}": skipping trace'
                )
                continue
            # only use the station specified by the command line option
            # "--station", if any
            if (config.options.station is not None and
                    trace.stats.station != config.options.station):
                continue
            _correct_traceid(trace)
            try:
                _add_instrtype(trace)
                _add_inventory(trace, inventory, config)
                _check_instrtype(trace)
                _add_coords(trace)
                _add_event(trace, ssp_event, depth_override)
                _add_picks(trace, picks)
            except Exception as err:
                for line in str(err).splitlines():
                    logger.warning(line)
                continue
            st.append(trace)
    shutil.rmtree(tmpdir)
    return st
# -----------------------------------------------------------------------------


def _log_event_info(ssp_event):
    for line in str(ssp_event).splitlines():
        logger.info(line)
    logger.info('---------------------------------------------------')


# Public interface:
[docs] def read_traces(config): """Read traces, store waveforms and metadata.""" # read station metadata into an ObsPy ``Inventory`` object inventory = read_station_metadata(config.station_metadata) picks = [] ssp_event = None depth_override = config.options.depth # parse hypocenter file if config.options.hypo_file is not None: ssp_event, picks, file_format = parse_hypo_file( config.options.hypo_file, config.options.evid) config.hypo_file_format = file_format # parse pick file if config.options.pick_file is not None: picks = parse_hypo71_picks(config) # parse QML file if config.options.qml_file is not None: ssp_event, picks = parse_qml(config) if ssp_event is not None: override_event_depth(ssp_event, depth_override) _log_event_info(ssp_event) # finally, read trace files logger.info('Reading traces...') st = _read_trace_files(config, inventory, ssp_event, picks, depth_override) logger.info('Reading traces: done') logger.info('---------------------------------------------------') if len(st) == 0: logger.error('No trace loaded') ssp_exit(1) _complete_picks(st) # if ssp_event is still None, get it from first trace if ssp_event is None: try: ssp_event = st[0].stats.event _log_event_info(ssp_event) except AttributeError: logger.error('No hypocenter information found.') sys.stderr.write( '\n' 'Use "-q" or "-H" options to provide hypocenter information\n' 'or add hypocenter information to the SAC file header\n' '(if you use the SAC format).\n' ) ssp_exit(1) # add velocity info to hypocenter try: _hypo_vel(ssp_event.hypocenter, config) except Exception as e: logger.error( f'Unable to compute velocity at hypocenter: {e}\n') ssp_exit(1) if config.options.evname is not None: # add evname from command line, if any, overriding the one in ssp_event ssp_event.name = config.options.evname else: # add evname from ssp_event, if any, to config file config.options.evname = ssp_event.name # add event to config file config.event = ssp_event st.sort() return st