# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Read traces in multiple formats of data and metadata.
:copyright:
2012 Claudio Satriano <satriano@ipgp.fr>
2013-2014 Claudio Satriano <satriano@ipgp.fr>,
Emanuela Matrullo <matrullo@geologie.ens.fr>
2015-2026 Claudio Satriano <satriano@ipgp.fr>,
Sophie Lambotte <sophie.lambotte@unistra.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import sys
import os
import logging
import shutil
import tarfile
import zipfile
import tempfile
import contextlib
import warnings
from obspy import read
from obspy.core import Stream
from obspy.core.util import AttribDict
from sourcespec.ssp_setup import (
ssp_exit, INSTR_CODES_VEL, INSTR_CODES_ACC, TRACEID_MAP)
from sourcespec.ssp_util import MediumProperties
from sourcespec.ssp_read_station_metadata import (
read_station_metadata, PAZ)
from sourcespec.ssp_read_event_metadata import (
parse_qml, parse_hypo_file, override_event_depth, parse_hypo71_picks)
from sourcespec.ssp_read_sac_header import (
is_SAC_trace,
compute_sensitivity_from_SAC,
get_instrument_from_SAC,
get_station_coordinates_from_SAC,
get_event_from_SAC,
get_picks_from_SAC)
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
# Silence specific ObsPy SAC sample spacing warning (see obspy#3408)
warnings.filterwarnings(
'ignore',
message=r'.*Sample spacing read from SAC file',
category=UserWarning,
module=r'obspy\.io\.sac\.util'
)
# TRACE MANIPULATION ----------------------------------------------------------
def _correct_traceid(trace):
if TRACEID_MAP is None:
return
with contextlib.suppress(KeyError):
traceid = TRACEID_MAP[trace.get_id()]
net, sta, loc, chan = traceid.split('.')
trace.stats.network = net
trace.stats.station = sta
trace.stats.location = loc
trace.stats.channel = chan
def _add_instrtype(trace):
"""Add instrtype to trace."""
instrtype = None
band_code = None
instr_code = None
trace.stats.instrtype = None
# First, try to get the instrtype from channel name
chan = trace.stats.channel
if len(chan) > 2:
band_code = chan[0]
instr_code = chan[1]
if instr_code in INSTR_CODES_VEL:
# SEED standard band codes from higher to lower sampling rate
# https://ds.iris.edu/ds/nodes/dmc/data/formats/seed-channel-naming/
if band_code in ['G', 'D', 'E', 'S']:
instrtype = 'shortp'
if band_code in ['F', 'C', 'H', 'B']:
instrtype = 'broadb'
if instr_code in INSTR_CODES_ACC:
instrtype = 'acc'
if instrtype is None and is_SAC_trace(trace):
# Let's see if there is an instrument name in SAC header (ISNet format)
# In this case, we define band and instrument codes a posteriori
instrtype, band_code, instr_code = get_instrument_from_SAC(trace)
orientation = trace.stats.channel[-1]
trace.stats.channel = ''.join((band_code, instr_code, orientation))
trace.stats.instrtype = instrtype
trace.stats.info = f'{trace.id} {trace.stats.instrtype}'
def _add_inventory(trace, inventory, config):
"""Add inventory to trace."""
net, sta, loc, chan = trace.id.split('.')
inv = (
inventory.select(
network=net, station=sta, location=loc, channel=chan)
or inventory.select(
network='XX', station='GENERIC', location='XX', channel='XXX')
)
if 'XX.GENERIC.XX.XXX' in inv.get_contents()['channels']:
inv = inv.copy()
inv.networks[0].code = net
inv.networks[0].stations[0].code = sta
inv.networks[0].stations[0].channels[0].code = chan
inv.networks[0].stations[0].channels[0].location_code = loc
# If a "sensitivity" config option is provided, override the Inventory
# object with a new one constructed from the sensitivity value
if config.sensitivity is not None:
# save coordinates from the inventory, if available
coords = None
with contextlib.suppress(Exception):
coords = inv.get_coordinates(trace.id, trace.stats.starttime)
paz = PAZ()
paz.seedID = trace.id
try:
# try to convert sensitivity to float
paz.sensitivity = float(config.sensitivity)
except ValueError as e:
if not is_SAC_trace(trace):
raise RuntimeError(
f'{trace.id}: cannot compute sensitivity from SAC header: '
f'sensitivity "{config.sensitivity}". '
'The trace is not in SAC format : skipping trace'
) from e
# if it fails, try to evaluate it as a string containing
# a combination of SAC header fields
paz.sensitivity = compute_sensitivity_from_SAC(trace, config)
paz.poles = []
paz.zeros = []
if inv:
logger.warning(
f'Overriding response for {trace.id} with constant '
f'sensitivity {paz.sensitivity}')
inv = paz.to_inventory()
# restore coordinates, if available
if coords is not None:
chan = inv.networks[0].stations[0].channels[0]
chan.latitude = coords['latitude']
chan.longitude = coords['longitude']
chan.elevation = coords['elevation']
chan.depth = coords['local_depth']
trace.stats.inventory = inv
def _check_instrtype(trace):
"""Check if instrument type is consistent with units in inventory."""
inv = trace.stats.inventory
if not inv:
raise RuntimeError(
f'{trace.id}: cannot get instrtype from inventory: '
'inventory is empty: skipping trace')
instrtype = trace.stats.instrtype
new_instrtype = None
try:
units = inv.get_response(trace.id, trace.stats.starttime).\
instrument_sensitivity.input_units
except Exception as e:
# inventory attached to trace has only one channel
chan = inv[0][0][0]
start_date = chan.start_date
end_date = chan.end_date
raise RuntimeError(
f'{trace.id}: cannot get units from inventory.\n'
f'> {e.__class__.__name__}: {e}\n'
f'> Channel start/end date: {start_date} {end_date}\n'
f'> Trace start time: {trace.stats.starttime}\n'
'> Skipping trace'
) from e
trace.stats.units = units
if units.lower() == 'm' and trace.stats.instrtype != 'disp':
new_instrtype = 'disp'
if units.lower() == 'm/s' and instrtype not in ['shortp', 'broadb']:
new_instrtype = 'broadb'
if units.lower() == 'm/s**2' and instrtype != 'acc':
new_instrtype = 'acc'
if new_instrtype is not None:
logger.warning(
f'{trace.id}: instrument response units are "{units}" but '
f'instrument type is "{instrtype}". Changing instrument type '
f'to "{new_instrtype}"')
trace.stats.instrtype = new_instrtype
def _add_coords(trace):
"""Add coordinates to trace."""
# If we already know that traceid is skipped, raise a silent exception
if trace.id in _add_coords.skipped:
raise RuntimeError()
coords = None
with contextlib.suppress(Exception):
# Inventory.get_coordinates() raises a generic Exception
# if coordinates are not found
coords = trace.stats.inventory.get_coordinates(
trace.id, trace.stats.starttime)
if coords is not None:
# Build an AttribDict and make sure that coordinates are floats
coords = AttribDict({
key: float(value) for key, value in coords.items()})
coords = (
None if (
coords.longitude == 0 and coords.latitude == 0 and
coords.local_depth == 123456 and coords.elevation == 123456)
else coords)
if coords is None and is_SAC_trace(trace):
# If we still don't have trace coordinates,
# we try to get them from SAC header
coords = get_station_coordinates_from_SAC(trace)
if coords is None:
# Give up!
_add_coords.skipped.append(trace.id)
raise RuntimeError(
f'{trace.id}: could not find coords for trace: skipping trace')
if coords.latitude == coords.longitude == 0:
logger.warning(
f'{trace.id}: trace has latitude and longitude equal to zero!')
# elevation is in meters in StationXML or SAC header
coords.elevation /= 1e3
trace.stats.coords = coords
# list to keep track of skipped traces
_add_coords.skipped = [] # noqa
def _add_event(trace, ssp_event=None, depth_override=None):
"""Add ssp_event object to trace."""
if ssp_event is None:
# Try to get hypocenter information from the SAC header
try:
ssp_event = get_event_from_SAC(trace)
logger.info(f'{trace.id}: event info read from SAC header')
override_event_depth(ssp_event, depth_override)
except Exception:
return
trace.stats.event = ssp_event
def _add_picks(trace, picks=None):
"""Add picks to trace."""
if picks is None:
picks = []
trace_picks = []
with contextlib.suppress(Exception):
trace_picks = get_picks_from_SAC(trace)
for pick in picks:
if pick.station == trace.stats.station:
trace_picks.append(pick)
trace.stats.picks = trace_picks
# Create empty dicts for arrivals, travel_times and takeoff angles.
# They will be used later.
trace.stats.arrivals = {}
trace.stats.travel_times = {}
trace.stats.takeoff_angles = {}
def _complete_picks(st):
"""Add component-specific picks to all components."""
for station in {tr.stats.station for tr in st}:
st_sel = st.select(station=station)
# 'code' is band+instrument code
for code in {tr.stats.channel[:-1] for tr in st_sel}:
st_sel2 = st_sel.select(channel=f'{code}?')
all_picks = [p for tr in st_sel2 for p in tr.stats.picks]
all_P_picks = [p for p in all_picks if p.phase == 'P']
all_S_picks = [p for p in all_picks if p.phase == 'S']
# Select default P and S picks as the first in list (or empty list)
default_P_pick = all_P_picks[:1]
default_S_pick = all_S_picks[:1]
for tr in st_sel2:
# Attribute default picks to components without picks
if not [p for p in tr.stats.picks if p.phase == 'P']:
tr.stats.picks += default_P_pick
if not [p for p in tr.stats.picks if p.phase == 'S']:
tr.stats.picks += default_S_pick
# -----------------------------------------------------------------------------
# FILE PARSING ----------------------------------------------------------------
def _hypo_vel(hypo, config):
medium_properties = MediumProperties(
hypo.longitude.value_in_deg, hypo.latitude.value_in_deg,
hypo.depth.value_in_km, config
)
hypo.vp = medium_properties.get(mproperty='vp', where='source')
hypo.vs = medium_properties.get(mproperty='vs', where='source')
hypo.rho = medium_properties.get(mproperty='rho', where='source')
depth_string = medium_properties.to_string(
'source depth', hypo.depth.value_in_km)
vp_string = medium_properties.to_string('vp_source', hypo.vp)
vs_string = medium_properties.to_string('vs_source', hypo.vs)
rho_string = medium_properties.to_string('rho_source', hypo.rho)
logger.info(f'{depth_string}, {vp_string}, {vs_string}, {rho_string}')
def _build_filelist(path, filelist, tmpdir):
if os.path.isdir(path):
listing = os.listdir(path)
for filename in listing:
fullpath = os.path.join(path, filename)
_build_filelist(fullpath, filelist, tmpdir)
else:
try:
# pylint: disable=unspecified-encoding consider-using-with
open(path)
except IOError as err:
logger.error(err)
return
if tarfile.is_tarfile(path) and tmpdir is not None:
try:
tar = tarfile.open(path)
except tarfile.ReadError:
# is_tarfile() gave a false positive, just append the file
# to the file list and let ObsPy handle it
filelist.append(path)
return
try:
tar.extractall(path=tmpdir)
except Exception as msg:
logger.warning(
f'{path}: Unable to fully extract tar archive: {msg}')
tar.close()
elif zipfile.is_zipfile(path) and tmpdir is not None:
try:
zipf = zipfile.ZipFile(path)
except zipfile.BadZipFile:
# is_zipfile() gave a false positive, just append the file
# to the file list and let ObsPy handle it
filelist.append(path)
return
try:
zipf.extractall(path=tmpdir)
except Exception as msg:
logger.warning(
f'{path}: Unable to fully extract zip archive: {msg}')
zipf.close()
else:
filelist.append(path)
def _read_trace_files(
config, inventory, ssp_event, picks, depth_override=None):
"""
Read trace files from a given path. Complete trace metadata and
return a stream object.
"""
# phase 1: build a file list
# ph 1.1: create a temporary dir and run '_build_filelist()'
# to move files to it and extract all tar archives
tmpdir = tempfile.mkdtemp()
filelist = []
for trace_path in config.options.trace_path:
_build_filelist(trace_path, filelist, tmpdir)
# ph 1.2: rerun '_build_filelist()' in tmpdir to add to the
# filelist all the extraceted files
listing = os.listdir(tmpdir)
for filename in listing:
fullpath = os.path.join(tmpdir, filename)
_build_filelist(fullpath, filelist, None)
# phase 2: build a stream object from the file list
orientation_codes = config.vertical_channel_codes +\
config.horizontal_channel_codes_1 +\
config.horizontal_channel_codes_2
st = Stream()
for filename in sorted(filelist):
try:
tmpst = read(filename, fsize=False)
except Exception:
logger.warning(
f'{filename}: Unable to read file as a trace: skipping')
continue
for trace in tmpst.traces:
try:
orientation = trace.stats.channel[-1]
except IndexError:
logger.warning(
f'{trace.id}: trace has no channel information: '
f'skipping trace'
)
continue
if orientation not in orientation_codes:
logger.warning(
f'{trace.id}: Unknown channel orientation: '
f'"{orientation}": skipping trace'
)
continue
# only use the station specified by the command line option
# "--station", if any
if (config.options.station is not None and
trace.stats.station != config.options.station):
continue
_correct_traceid(trace)
try:
_add_instrtype(trace)
_add_inventory(trace, inventory, config)
_check_instrtype(trace)
_add_coords(trace)
_add_event(trace, ssp_event, depth_override)
_add_picks(trace, picks)
except Exception as err:
for line in str(err).splitlines():
logger.warning(line)
continue
st.append(trace)
shutil.rmtree(tmpdir)
return st
# -----------------------------------------------------------------------------
def _log_event_info(ssp_event):
for line in str(ssp_event).splitlines():
logger.info(line)
logger.info('---------------------------------------------------')
# Public interface:
[docs]
def read_traces(config):
"""Read traces, store waveforms and metadata."""
# read station metadata into an ObsPy ``Inventory`` object
inventory = read_station_metadata(config.station_metadata)
picks = []
ssp_event = None
depth_override = config.options.depth
# parse hypocenter file
if config.options.hypo_file is not None:
ssp_event, picks, file_format = parse_hypo_file(
config.options.hypo_file, config.options.evid)
config.hypo_file_format = file_format
# parse pick file
if config.options.pick_file is not None:
picks = parse_hypo71_picks(config)
# parse QML file
if config.options.qml_file is not None:
ssp_event, picks = parse_qml(config)
if ssp_event is not None:
override_event_depth(ssp_event, depth_override)
_log_event_info(ssp_event)
# finally, read trace files
logger.info('Reading traces...')
st = _read_trace_files(config, inventory, ssp_event, picks, depth_override)
logger.info('Reading traces: done')
logger.info('---------------------------------------------------')
if len(st) == 0:
logger.error('No trace loaded')
ssp_exit(1)
_complete_picks(st)
# if ssp_event is still None, get it from first trace
if ssp_event is None:
try:
ssp_event = st[0].stats.event
_log_event_info(ssp_event)
except AttributeError:
logger.error('No hypocenter information found.')
sys.stderr.write(
'\n'
'Use "-q" or "-H" options to provide hypocenter information\n'
'or add hypocenter information to the SAC file header\n'
'(if you use the SAC format).\n'
)
ssp_exit(1)
# add velocity info to hypocenter
try:
_hypo_vel(ssp_event.hypocenter, config)
except Exception as e:
logger.error(
f'Unable to compute velocity at hypocenter: {e}\n')
ssp_exit(1)
if config.options.evname is not None:
# add evname from command line, if any, overriding the one in ssp_event
ssp_event.name = config.options.evname
else:
# add evname from ssp_event, if any, to config file
config.options.evname = ssp_event.name
# add event to config file
config.event = ssp_event
st.sort()
return st