# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Read traces in multiple formats of data and metadata.
:copyright:
2012 Claudio Satriano <satriano@ipgp.fr>
2013-2014 Claudio Satriano <satriano@ipgp.fr>,
Emanuela Matrullo <matrullo@geologie.ens.fr>
2015-2024 Claudio Satriano <satriano@ipgp.fr>,
Sophie Lambotte <sophie.lambotte@unistra.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import sys
import os
import logging
import shutil
import tarfile
import zipfile
import tempfile
import contextlib
from obspy import read
from obspy.core import Stream
from obspy.core.util import AttribDict
from sourcespec.ssp_setup import (
ssp_exit, INSTR_CODES_VEL, INSTR_CODES_ACC, TRACEID_MAP)
from sourcespec.ssp_util import MediumProperties
from sourcespec.ssp_read_station_metadata import (
read_station_metadata, PAZ)
from sourcespec.ssp_read_event_metadata import (
parse_qml, parse_hypo_file, parse_hypo71_picks)
from sourcespec.ssp_read_sac_header import (
compute_sensitivity_from_SAC,
get_instrument_from_SAC, get_station_coordinates_from_SAC,
get_event_from_SAC, get_picks_from_SAC)
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
# TRACE MANIPULATION ----------------------------------------------------------
def _correct_traceid(trace):
if TRACEID_MAP is None:
return
with contextlib.suppress(KeyError):
traceid = TRACEID_MAP[trace.get_id()]
net, sta, loc, chan = traceid.split('.')
trace.stats.network = net
trace.stats.station = sta
trace.stats.location = loc
trace.stats.channel = chan
def _add_instrtype(trace):
"""Add instrtype to trace."""
instrtype = None
band_code = None
instr_code = None
trace.stats.instrtype = None
# First, try to get the instrtype from channel name
chan = trace.stats.channel
if len(chan) > 2:
band_code = chan[0]
instr_code = chan[1]
if instr_code in INSTR_CODES_VEL:
# SEED standard band codes from higher to lower sampling rate
# https://ds.iris.edu/ds/nodes/dmc/data/formats/seed-channel-naming/
if band_code in ['G', 'D', 'E', 'S']:
instrtype = 'shortp'
if band_code in ['F', 'C', 'H', 'B']:
instrtype = 'broadb'
if instr_code in INSTR_CODES_ACC:
instrtype = 'acc'
if instrtype is None:
# Let's see if there is an instrument name in SAC header (ISNet format)
# In this case, we define band and instrument codes a posteriori
instrtype, band_code, instr_code = get_instrument_from_SAC(trace)
orientation = trace.stats.channel[-1]
trace.stats.channel = ''.join((band_code, instr_code, orientation))
trace.stats.instrtype = instrtype
trace.stats.info = f'{trace.id} {trace.stats.instrtype}'
def _add_inventory(trace, inventory, config):
"""Add inventory to trace."""
net, sta, loc, chan = trace.id.split('.')
inv = (
inventory.select(
network=net, station=sta, location=loc, channel=chan)
or inventory.select(
network='XX', station='GENERIC', location='XX', channel='XXX')
)
if 'XX.GENERIC.XX.XXX' in inv.get_contents()['channels']:
inv = inv.copy()
inv.networks[0].code = net
inv.networks[0].stations[0].code = sta
inv.networks[0].stations[0].channels[0].code = chan
inv.networks[0].stations[0].channels[0].location_code = loc
# If a "sensitivity" config option is provided, override the Inventory
# object with a new one constructed from the sensitivity value
if config.sensitivity is not None:
# save coordinates from the inventory, if available
coords = None
with contextlib.suppress(Exception):
coords = inv.get_coordinates(trace.id, trace.stats.starttime)
paz = PAZ()
paz.seedID = trace.id
paz.sensitivity = compute_sensitivity_from_SAC(trace, config)
paz.poles = []
paz.zeros = []
if inv:
logger.warning(
f'Overriding response for {trace.id} with constant '
f'sensitivity {paz.sensitivity}')
inv = paz.to_inventory()
# restore coordinates, if available
if coords is not None:
chan = inv.networks[0].stations[0].channels[0]
chan.latitude = coords['latitude']
chan.longitude = coords['longitude']
chan.elevation = coords['elevation']
chan.depth = coords['local_depth']
trace.stats.inventory = inv
def _check_instrtype(trace):
"""Check if instrument type is consistent with units in inventory."""
inv = trace.stats.inventory
if not inv:
raise RuntimeError(
f'{trace.id}: cannot get instrtype from inventory: '
'inventory is empty: skipping trace')
instrtype = trace.stats.instrtype
new_instrtype = None
try:
units = inv.get_response(trace.id, trace.stats.starttime).\
instrument_sensitivity.input_units
except Exception as e:
# inventory attached to trace has only one channel
chan = inv[0][0][0]
start_date = chan.start_date
end_date = chan.end_date
raise RuntimeError(
f'{trace.id}: cannot get units from inventory.\n'
f'> {e.__class__.__name__}: {e}\n'
f'> Channel start/end date: {start_date} {end_date}\n'
f'> Trace start time: {trace.stats.starttime}\n'
'> Skipping trace'
) from e
trace.stats.units = units
if units.lower() == 'm' and trace.stats.instrtype != 'disp':
new_instrtype = 'disp'
if units.lower() == 'm/s' and instrtype not in ['shortp', 'broadb']:
new_instrtype = 'broadb'
if units.lower() == 'm/s**2' and instrtype != 'acc':
new_instrtype = 'acc'
if new_instrtype is not None:
logger.warning(
f'{trace.id}: instrument response units are "{units}" but '
f'instrument type is "{instrtype}". Changing instrument type '
f'to "{new_instrtype}"')
trace.stats.instrtype = new_instrtype
def _add_coords(trace):
"""Add coordinates to trace."""
# If we already know that traceid is skipped, raise a silent exception
if trace.id in _add_coords.skipped:
raise RuntimeError()
coords = None
with contextlib.suppress(Exception):
# Inventory.get_coordinates() raises a generic Exception
# if coordinates are not found
coords = trace.stats.inventory.get_coordinates(
trace.id, trace.stats.starttime)
if coords is not None:
# Build an AttribDict and make sure that coordinates are floats
coords = AttribDict({
key: float(value) for key, value in coords.items()})
coords = (
None if (
coords.longitude == 0 and coords.latitude == 0 and
coords.local_depth == 123456 and coords.elevation == 123456)
else coords)
if coords is None:
# If we still don't have trace coordinates,
# we try to get them from SAC header
coords = get_station_coordinates_from_SAC(trace)
if coords is None:
# Give up!
_add_coords.skipped.append(trace.id)
raise RuntimeError(
f'{trace.id}: could not find coords for trace: skipping trace')
if coords.latitude == coords.longitude == 0:
logger.warning(
f'{trace.id}: trace has latitude and longitude equal to zero!')
# elevation is in meters in StationXML or SAC header
coords.elevation /= 1e3
trace.stats.coords = coords
# list to keep track of skipped traces
_add_coords.skipped = [] # noqa
def _add_event(trace, ssp_event=None):
"""Add ssp_event object to trace."""
if ssp_event is None:
# Try to get hypocenter information from the SAC header
try:
ssp_event = get_event_from_SAC(trace)
except Exception:
return
trace.stats.event = ssp_event
def _add_picks(trace, picks=None):
"""Add picks to trace."""
if picks is None:
picks = []
trace_picks = []
with contextlib.suppress(Exception):
trace_picks = get_picks_from_SAC(trace)
for pick in picks:
if pick.station == trace.stats.station:
trace_picks.append(pick)
trace.stats.picks = trace_picks
# Create empty dicts for arrivals, travel_times and takeoff angles.
# They will be used later.
trace.stats.arrivals = {}
trace.stats.travel_times = {}
trace.stats.takeoff_angles = {}
def _complete_picks(st):
"""Add component-specific picks to all components."""
for station in {tr.stats.station for tr in st}:
st_sel = st.select(station=station)
# 'code' is band+instrument code
for code in {tr.stats.channel[:-1] for tr in st_sel}:
st_sel2 = st_sel.select(channel=f'{code}?')
all_picks = [p for tr in st_sel2 for p in tr.stats.picks]
all_P_picks = [p for p in all_picks if p.phase == 'P']
all_S_picks = [p for p in all_picks if p.phase == 'S']
# Select default P and S picks as the first in list (or empty list)
default_P_pick = all_P_picks[:1]
default_S_pick = all_S_picks[:1]
for tr in st_sel2:
# Attribute default picks to components without picks
if not [p for p in tr.stats.picks if p.phase == 'P']:
tr.stats.picks += default_P_pick
if not [p for p in tr.stats.picks if p.phase == 'S']:
tr.stats.picks += default_S_pick
# -----------------------------------------------------------------------------
# FILE PARSING ----------------------------------------------------------------
def _hypo_vel(hypo, config):
medium_properties = MediumProperties(
hypo.longitude, hypo.latitude, hypo.depth.value_in_km, config)
hypo.vp = medium_properties.get(mproperty='vp', where='source')
hypo.vs = medium_properties.get(mproperty='vs', where='source')
hypo.rho = medium_properties.get(mproperty='rho', where='source')
depth_string = medium_properties.to_string(
'source depth', hypo.depth.value_in_km)
vp_string = medium_properties.to_string('vp_source', hypo.vp)
vs_string = medium_properties.to_string('vs_source', hypo.vs)
rho_string = medium_properties.to_string('rho_source', hypo.rho)
logger.info(f'{depth_string}, {vp_string}, {vs_string}, {rho_string}')
def _build_filelist(path, filelist, tmpdir):
if os.path.isdir(path):
listing = os.listdir(path)
for filename in listing:
fullpath = os.path.join(path, filename)
_build_filelist(fullpath, filelist, tmpdir)
else:
try:
# pylint: disable=unspecified-encoding consider-using-with
open(path)
except IOError as err:
logger.error(err)
return
if tarfile.is_tarfile(path) and tmpdir is not None:
with tarfile.open(path) as tar:
try:
tar.extractall(path=tmpdir)
except Exception as msg:
logger.warning(
f'{path}: Unable to fully extract tar archive: {msg}')
elif zipfile.is_zipfile(path) and tmpdir is not None:
with zipfile.ZipFile(path) as zipf:
try:
zipf.extractall(path=tmpdir)
except Exception as msg:
logger.warning(
f'{path}: Unable to fully extract zip archive: {msg}')
else:
filelist.append(path)
def _read_trace_files(config, inventory, ssp_event, picks):
"""
Read trace files from a given path. Complete trace metadata and
return a stream object.
"""
# phase 1: build a file list
# ph 1.1: create a temporary dir and run '_build_filelist()'
# to move files to it and extract all tar archives
tmpdir = tempfile.mkdtemp()
filelist = []
for trace_path in config.options.trace_path:
_build_filelist(trace_path, filelist, tmpdir)
# ph 1.2: rerun '_build_filelist()' in tmpdir to add to the
# filelist all the extraceted files
listing = os.listdir(tmpdir)
for filename in listing:
fullpath = os.path.join(tmpdir, filename)
_build_filelist(fullpath, filelist, None)
# phase 2: build a stream object from the file list
orientation_codes = config.vertical_channel_codes +\
config.horizontal_channel_codes_1 +\
config.horizontal_channel_codes_2
st = Stream()
for filename in sorted(filelist):
try:
tmpst = read(filename, fsize=False)
except Exception:
logger.warning(
f'{filename}: Unable to read file as a trace: skipping')
continue
for trace in tmpst.traces:
orientation = trace.stats.channel[-1]
if orientation not in orientation_codes:
logger.warning(
f'{trace.id}: Unknown channel orientation: '
f'"{orientation}": skipping trace'
)
continue
# only use the station specified by the command line option
# "--station", if any
if (config.options.station is not None and
trace.stats.station != config.options.station):
continue
_correct_traceid(trace)
try:
_add_instrtype(trace)
_add_inventory(trace, inventory, config)
_check_instrtype(trace)
_add_coords(trace)
_add_event(trace, ssp_event)
_add_picks(trace, picks)
except Exception as err:
for line in str(err).splitlines():
logger.warning(line)
continue
st.append(trace)
shutil.rmtree(tmpdir)
return st
# -----------------------------------------------------------------------------
def _log_event_info(ssp_event):
for line in str(ssp_event).splitlines():
logger.info(line)
logger.info('---------------------------------------------------')
# Public interface:
[docs]
def read_traces(config):
"""Read traces, store waveforms and metadata."""
# read station metadata into an ObsPy ``Inventory`` object
inventory = read_station_metadata(config.station_metadata)
picks = []
ssp_event = None
# parse hypocenter file
if config.options.hypo_file is not None:
ssp_event, picks, file_format = parse_hypo_file(
config.options.hypo_file, config.options.evid)
config.hypo_file_format = file_format
# parse pick file
if config.options.pick_file is not None:
picks = parse_hypo71_picks(config)
# parse QML file
if config.options.qml_file is not None:
ssp_event, picks = parse_qml(config)
if ssp_event is not None:
_log_event_info(ssp_event)
# finally, read trace files
logger.info('Reading traces...')
st = _read_trace_files(config, inventory, ssp_event, picks)
logger.info('Reading traces: done')
logger.info('---------------------------------------------------')
if len(st) == 0:
logger.error('No trace loaded')
ssp_exit(1)
_complete_picks(st)
# if ssp_event is still None, get it from first trace
if ssp_event is None:
try:
ssp_event = st[0].stats.event
_log_event_info(ssp_event)
except AttributeError:
logger.error('No hypocenter information found.')
sys.stderr.write(
'\n'
'Use "-q" or "-H" options to provide hypocenter information\n'
'or add hypocenter information to the SAC file header\n'
'(if you use the SAC format).\n'
)
ssp_exit(1)
# add velocity info to hypocenter
try:
_hypo_vel(ssp_event.hypocenter, config)
except Exception as e:
logger.error(
f'Unable to compute velocity at hypocenter: {e}\n')
ssp_exit(1)
if config.options.evname is not None:
# add evname from command line, if any, overriding the one in ssp_event
ssp_event.name = config.options.evname
else:
# add evname from ssp_event, if any, to config file
config.options.evname = ssp_event.name
# add event to config file
config.event = ssp_event
st.sort()
return st