# -*- coding: utf-8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
Output functions for source_spec.
:copyright:
2012 Claudio Satriano <satriano@ipgp.fr>
2013-2014 Claudio Satriano <satriano@ipgp.fr>,
Emanuela Matrullo <matrullo@geologie.ens.fr>
2015-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
CeCILL Free Software License Agreement v2.1
(http://www.cecill.info/licences.en.html)
"""
import os
import shutil
import contextlib
import logging
from collections.abc import Mapping
from datetime import datetime
from tzlocal import get_localzone
import numpy as np
from sourcespec.ssp_qml_output import write_qml
from sourcespec.ssp_sqlite_output import write_sqlite
from sourcespec._version import get_versions
logger = logging.getLogger(__name__.rsplit('.', maxsplit=1)[-1])
def _write_author_and_agency_to_parfile(config, parfile):
author_str = empty_author_str = '\n*** Author:'
if config.author_name is not None:
author_str += f' {config.author_name}'
if config.author_email is not None:
if author_str != empty_author_str:
author_str += f' <{config.author_email}>'
else:
author_str += f' {config.author_email}'
if author_str != empty_author_str:
parfile.write(author_str)
agency_str = empty_agency_str = '\n*** Agency:'
if config.agency_full_name is not None:
agency_str += f' {config.agency_full_name}'
if config.agency_short_name is not None:
if agency_str != empty_agency_str:
agency_str += f' ({config.agency_short_name})'
else:
agency_str += f' {config.agency_short_name}'
if config.agency_url is not None:
if agency_str != empty_agency_str:
agency_str += ' -'
agency_str += f' {config.agency_url}'
if agency_str != empty_agency_str:
parfile.write(agency_str)
def _value_error_str(value, error, fmt):
if error[0] == error[1]:
s = f'{fmt} +/- {fmt}'
s = s.format(value, error[0])
else:
s = f'{fmt} /- {fmt} /+ {fmt}'
s = s.format(value, error[0], error[1])
return s
def _write_parfile(config, sspec_output):
"""
Write station source parameters to file.
Note: this format is deprecated and will not evolve anymore
(e.g., including new parameters or new way of computing statistics).
"""
if not os.path.exists(config.options.outdir):
os.makedirs(config.options.outdir)
evid = config.event.event_id
parfilename = os.path.join(
config.options.outdir, f'{evid}.ssp.out')
with open(parfilename, 'w', encoding='utf-8') as parfile:
parfile.write(
'*** Note: this file is deprecated and might not contain all the '
'output information. ***\n')
parfile.write(
'*** It will be removed in future versions of SourceSpec. ***\n')
parfile.write(
'*** Please look at the YAML file in this directory. ***\n')
evid = config.event.event_id
hypo = config.event.hypocenter
evlo = hypo.longitude.value_in_deg
evla = hypo.latitude.value_in_deg
evdp = hypo.depth.value_in_km
parfile.write(
f'{evid} lon {evlo:8.3f} lat {evla:7.3f} depth {evdp:5.1f} km '
f'orig_time {hypo.origin_time}\n\n')
parfile.write('*** Station source parameters ***\n')
parfile.write(
'*** Note: outliers are prepended by a star (*) symbol ***\n')
parkeys = (
'Mw', 'fc', 't_star', 'Qo', 'Mo',
'ssd', 'ra', 'hyp_dist', 'az', 'Er'
)
formats = {
'Mo': '{:.3e} ',
'Er': '{:.3e} ',
'hyp_dist': '{:7.3f} ',
'az': '{:7.3f} ',
'Mw': '{:6.3f} ',
'fc': '{:6.3f} ',
'ssd': '{:.3e} ',
'ra': '{:8.3f} ',
't_star': '{:6.3f} ',
'Qo': '{:7.1f} ',
'Ml': '{:6.3f} '
}
formats_none = {
'Mo': '{:>9} ',
'Er': '{:>9} ',
'hyp_dist': '{:>7} ',
'az': '{:>7} ',
'Mw': '{:>6} ',
'fc': '{:>6} ',
'ssd': '{:>9} ',
'ra': '{:>8} ',
't_star': '{:>6} ',
'Qo': '{:>7} ',
'Ml': '{:>6} '
}
stationpar = sspec_output.station_parameters
for statId in sorted(stationpar.keys()):
par = stationpar[statId]
parfile.write(f'{statId:>15} {par.instrument_type:>6}\t')
for key in parkeys:
if key == 'az':
_pkey = key
val = par['azimuth']
outl = False
elif key == 'hyp_dist':
_pkey = key
val = par['hypo_dist_in_km']
outl = False
elif key == 'ra':
_pkey = 'radius'
val = par[_pkey].value
outl = par[_pkey].outlier
else:
_pkey = key
val = par[_pkey].value
outl = par[_pkey].outlier
space = ' *' if outl else ' '
parfile.write(f'{space}{key} ')
if val is not None and ~np.isnan(val):
parfile.write(formats[key].format(val))
else:
parfile.write(formats_none[key].format('nan'))
parfile.write('\n')
parfile.write(f'{"--- errmin":>22}\t')
for key in parkeys:
_pkey = 'radius' if key == 'ra' else key
if key in ['hyp_dist', 'az']:
outl = False
err = None
else:
outl = par[_pkey].outlier
err = par[_pkey].lower_uncertainty
if err is None:
err = par[_pkey].uncertainty
space = ' *' if outl else ' '
parfile.write(f'{space}{key} ')
if err is not None:
parfile.write(formats[key].format(err))
else:
parfile.write(formats_none[key].format('nan'))
parfile.write('\n')
parfile.write(f'{"--- errmax":>22}\t')
for key in parkeys:
_pkey = 'radius' if key == 'ra' else key
if key in ['hyp_dist', 'az']:
outl = False
err = None
else:
outl = par[_pkey].outlier
err = par[_pkey].upper_uncertainty
if err is None:
err = par[_pkey].uncertainty
space = ' *' if outl else ' '
parfile.write(f'{space}{key} ')
if err is not None:
parfile.write(formats[key].format(err))
else:
parfile.write(formats_none[key].format('nan'))
parfile.write('\n')
means = sspec_output.mean_values()
errors = sspec_output.mean_uncertainties()
means_weight = sspec_output.weighted_mean_values()
errors_weight = sspec_output.weighted_mean_uncertainties()
parfile.write('\n*** Average source parameters ***\n')
parfile.write(
'*** Note: averages computed after removing outliers ****\n')
Mw_mean = means['Mw']
Mw_error = errors['Mw']
s = _value_error_str(Mw_mean, Mw_error, '{:.2f}')
parfile.write(f'Mw: {s}\n')
Mw_mean_weight = means_weight['Mw']
Mw_error_weight = errors_weight['Mw']
s = _value_error_str(Mw_mean_weight, Mw_error_weight, '{:.2f}')
parfile.write(f'Mw (weighted): {s}\n')
Mo_mean = means['Mo']
Mo_error = errors['Mo']
s = _value_error_str(Mo_mean, Mo_error, '{:.3e}')
parfile.write(f'Mo: {s} N·m\n')
Mo_mean_weight = means_weight['Mo']
Mo_error_weight = errors_weight['Mo']
s = _value_error_str(Mo_mean_weight, Mo_error_weight, '{:.3e}')
parfile.write(f'Mo (weighted): {s} N·m\n')
fc_mean = means['fc']
fc_error = errors['fc']
s = _value_error_str(fc_mean, fc_error, '{:.3f}')
parfile.write(f'fc: {s} Hz\n')
fc_mean_weight = means_weight['fc']
fc_error_weight = errors_weight['fc']
s = _value_error_str(fc_mean_weight, fc_error_weight, '{:.3f}')
parfile.write(f'fc (weighted): {s} Hz\n')
t_star_mean = means['t_star']
t_star_error = errors['t_star']
s = _value_error_str(t_star_mean, t_star_error, '{:.3f}')
parfile.write(f't_star: {s} s\n')
t_star_mean_weight = means_weight['t_star']
t_star_error_weight = errors_weight['t_star']
s = _value_error_str(t_star_mean_weight, t_star_error_weight, '{:.3f}')
parfile.write(f't_star (weighted): {s} s\n')
Qo_mean = means['Qo']
Qo_error = errors['Qo']
s = _value_error_str(Qo_mean, Qo_error, '{:.1f}')
parfile.write(f'Qo: {s}\n')
try:
Qo_mean_weight = means_weight['Qo']
Qo_error_weight = errors_weight['Qo']
except KeyError:
# weighted Qo might not be computed
Qo_mean_weight = np.nan
Qo_error_weight = [np.nan, np.nan]
s = _value_error_str(Qo_mean_weight, Qo_error_weight, '{:.1f}')
parfile.write(f'Qo (weighted): {s}\n')
ra_mean = means['radius']
ra_error = errors['radius']
s = _value_error_str(ra_mean, ra_error, '{:.3f}')
parfile.write(f'Source radius: {s} m\n')
ra_mean_weight = means_weight['radius']
ra_error_weight = errors_weight['radius']
s = _value_error_str(ra_mean_weight, ra_error_weight, '{:.3f}')
parfile.write(f'Source radius (weighted): {s} m\n')
ssd_mean = means['ssd']
ssd_error = errors['ssd']
s = _value_error_str(ssd_mean, ssd_error, '{:.3e}')
parfile.write(f'Static stress drop: {s} MPa\n')
ssd_mean_weight = means_weight['ssd']
ssd_error_weight = errors_weight['ssd']
s = _value_error_str(ssd_mean_weight, ssd_error_weight, '{:.3e}')
parfile.write(f'Static stress drop (weighted): {s} MPa\n')
Ml_mean = means.get('Ml', None)
Ml_error = errors.get('Ml', None)
if Ml_mean is not None:
s = _value_error_str(Ml_mean, Ml_error, '{:.3f}')
parfile.write(f'Ml: {s}\n')
Er_mean = means['Er']
Er_error = errors['Er']
s = _value_error_str(Er_mean, Er_error, '{:.3e}')
parfile.write(f'Er: {s} N·m\n')
parfile.write(f'\n*** SourceSpec: {get_versions()["version"]}')
parfile.write(
f'\n*** Run completed on: {config.end_of_run} '
f'{config.end_of_run_tz}')
if config.options.run_id:
parfile.write(f'\n*** Run ID: {config.options.run_id}')
_write_author_and_agency_to_parfile(config, parfile)
logger.info(f'Output written to file: {parfilename}')
def _dict2yaml(dict_like, level=0):
"""Serialize a dict-like object into YAML format."""
if not isinstance(dict_like, Mapping):
raise TypeError('dict_like must be a dict-like object')
comments = dict_like.get('_comments', {})
fmt = dict_like.get('_format', None)
value_uncertainty_keys = (
'value', 'uncertainty', 'lower_uncertainty', 'upper_uncertainty')
target_dict = {
key: (fmt.format(value)
if fmt is not None and key in value_uncertainty_keys
else value)
for key, value in dict_like.items()
if not key.startswith('_') and value is not None
}
indent = ' ' * 2 * level
# use oneliners for dict-like objects containing value and uncertainty keys
if set(target_dict.keys()).intersection(set(value_uncertainty_keys)):
oneliner = str(target_dict).replace("'", "")
return f'{indent}{oneliner}\n'
lines = ''
for key, value in target_dict.items():
if key.startswith('_') or value is None:
continue
if isinstance(value, Mapping):
if level == 0:
lines += '\n'
with contextlib.suppress(KeyError):
comment = comments[key]
for line in comment.split('\n'):
lines += f'# {line}\n'
lines += f'{indent}{key}:\n'
lines += _dict2yaml(value, level + 1)
else:
lines += f'{indent}{key}: {value}\n'
return lines
def _write_yaml(config, sspec_output):
"""Write sspec output in a YAML file."""
if not os.path.exists(config.options.outdir):
os.makedirs(config.options.outdir)
evid = config.event.event_id
yamlfilename = os.path.join(config.options.outdir, f'{evid}.ssp.yaml')
lines = _dict2yaml(sspec_output)
comments = sspec_output.get('_comments', {})
with open(yamlfilename, 'w', encoding='utf-8') as fp:
begin_comment = comments.get('begin', None)
if begin_comment is not None:
for line in begin_comment.split('\n'):
fp.write(f'# {line}\n')
fp.write(lines)
logger.info(f'Output written to file: {yamlfilename}')
def _write_hypo71(config, sspec_output):
if not config.options.hypo_file:
return
if config.hypo_file_format != 'hypo71':
return
with open(config.options.hypo_file, 'r', encoding='ascii') as fp:
line = fp.readline()
# Check if first 10 digits of the line contain characters
if any(c.isalpha() for c in line[:10]):
line1 = line
line = fp.readline()
line = list(line)
summary_values = sspec_output.reference_values()
mw_str = f'{summary_values["Mw"]:03.2f}'
Ml = summary_values.get('Ml', None)
ml_str = f'{Ml:03.2f}' if Ml is not None and ~np.isnan(Ml) else ' ' * 4
for i in range(4):
line[49 + i] = mw_str[0 + i]
# line[45 + i] = mw_str[0 + i]
line[69 + i] = ml_str[0 + i]
outline = ''.join(line)
evid = config.event.event_id
hypo_file_out = os.path.join(config.options.outdir, f'{evid}.ssp.h')
with open(hypo_file_out, 'w', encoding='ascii') as fp:
with contextlib.suppress(Exception):
fp.write(line1)
fp.write(outline)
logger.info(f'Hypo file written to: {hypo_file_out}')
def _make_symlinks(config):
"""Make symlinks to input files into output directory."""
# Windows does not support symlinks
if os.name == 'nt':
return
outdir = config.options.outdir
out_data_dir = os.path.join(outdir, 'input_files')
rel_path = os.path.relpath(config.workdir, out_data_dir)
os.makedirs(out_data_dir, exist_ok=True)
filelist =\
list(config.options.trace_path) +\
[
config.options.station_metadata,
config.options.hypo_file,
config.options.pick_file,
config.options.qml_file,
]
for filename in filelist:
if filename is None or not os.path.exists(filename):
continue
basename = os.path.basename(filename)
linkname = os.path.join(out_data_dir, basename)
if os.path.isfile(linkname) or os.path.islink(linkname):
os.remove(linkname)
elif os.path.isdir(linkname):
shutil.rmtree(linkname, ignore_errors=True)
filename = os.path.join(rel_path, filename)
os.symlink(filename, linkname)
[docs]
def write_output(config, sspec_output):
"""Write results into different formats."""
# Add run info to output object
run_info = sspec_output.run_info
run_info.SourceSpec_version = get_versions()['version']
config.end_of_run = datetime.now()
tz = get_localzone()
config.end_of_run_tz = tz.tzname(config.end_of_run)
run_info.run_completed = f'{config.end_of_run} {config.end_of_run_tz}'
if config.options.run_id:
run_info.run_id = config.options.run_id
run_info.author_name = config.author_name
run_info.author_email = config.author_email
run_info.agency_full_name = config.agency_full_name
run_info.agency_short_name = config.agency_short_name
run_info.agency_url = config.agency_url
# Symlink input files into output directory
_make_symlinks(config)
# Write to parfile (deprecated)
_write_parfile(config, sspec_output)
# Write to YAML file
_write_yaml(config, sspec_output)
# Write to SQLite database, if requested
write_sqlite(config, sspec_output)
# Write to hypo file, if requested
_write_hypo71(config, sspec_output)
# Write to quakeml file, if requested
write_qml(config, sspec_output)
[docs]
def save_spectra(config, spec_st):
"""Save spectra to file."""
if not config.save_spectra:
return
outfile = os.path.join(
config.options.outdir,
f'{config.event.event_id}.spectra.hdf5'
)
spec_st.sort()
for spec in spec_st:
spec.stats.software = 'SourceSpec'
spec.stats.software_version = get_versions()['version']
spec_st.write(outfile)
logger.info(f'Spectra saved to: {outfile}')