Source code for plot_sourcepars

# -*- coding: utf8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
plot_sourcepars.py

1D or 2D plot of source parameters from a sqlite parameter file.

:copyright:
    2023-2024 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import argparse
import contextlib
import sys
import os
import sqlite3
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit

valid_plot_types = [
    'fc', 'Er', 'ssd', 'ra', 'Mo', 't_star', 'Qo', 'sigma_a',
    'fc_mw', 'Er_mw', 'ssd_mw']


[docs] class Annot(): """ Annotate the plot with the evid, Mw and the value of the parameter. """ def __init__(self, xdata, ydata, labels, yformat): self.xdata = xdata self.ydata = ydata self.labels = labels self.yformat = yformat def __call__(self, event): ind = event.ind x = np.take(self.xdata, ind) y = np.take(self.ydata, ind) label = np.take(self.labels, ind) for Mw, yvalue, evid in zip(x, y, label): ystring = self.yformat.format(yvalue) print(f'{evid} Mw {Mw:.1f} {ystring}')
[docs] def mag_to_moment(mag, b=0.5): """ Convert magnitude to moment. The parameter b is used to change the slope of the fc-Mw stress drop curve. The standard value of b is 0.5, which corresponds to a self-similar model. """ mag = np.asarray(mag) return np.power(10, (3 * b * mag + 9.1))
[docs] def stress_drop_curve_fc_mw(delta_sigma, vel, mw, k=0.3724, b=-0.5): """ Constant stress drop curve in fc vs Mw. Obtained by combining the equation for stress drop: delta_sigma = 7/16 * Mo / a^3 with the equation for source radius: a = k * vel * (delta_sigma / fc) where k is a coefficient discussed in Kaneko and Shearer (2014). For the Brune source model, k=0.3724. Parameters ---------- delta_sigma : float Stress drop in MPa. vel : float P or S-wave velocity in km/s. mw : float Moment magnitude. b : float, optional The slope of the stress drop curve. Default is -0.5. k : float, optional Coefficient for the source radius. Default is 0.3724 (Brune source model). Returns ------- fc : float Corner frequency in Hz. """ vel *= 1e3 # P or S-wave velocity in m/s delta_sigma *= 1e6 # stress drop in Pa # compute moment from magnitude, allowing for non-self-similarity through # the b parameter moment = mag_to_moment(mw, -b) # return fc in Hz return k*vel*(16/7 * delta_sigma / moment)**(1/3)
[docs] def stress_drop_curve_Er_mw(delta_sigma, mu, mw): """ Constant stress drop curve in Er vs Mw. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 33., page 374. """ # Eq. (33) of Madariaga (2009): # 0.2331 * delta_sigma = mu * Er / Mo Mo = mag_to_moment(mw) delta_sigma *= 1e6 # return Er in N·m return 0.2331 * delta_sigma * Mo / mu
[docs] def apparent_stress_curve_Er_mw(sigma_a, mu, mw): """ Constant apparent stress curve in Er vs Mw. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 33., page 374. """ # Eq. (33) of Madariaga (2009): # sigma_a = mu * Er / Mo Mo = mag_to_moment(mw) sigma_a *= 1e6 # return Er in N·m return sigma_a * Mo / mu
[docs] def fc_mw_function(mw, a, b): """Function to fit fc vs Mw.""" return a / 3. + b * mw
[docs] def calc_r2(x, y, yerr, a, b): """Coefficient of determination.""" y_mean = np.mean(y) y_calc = fc_mw_function(x, a, b) SS_tot = np.sum(((y - y_mean) / yerr)**2.) SS_res = np.sum(((y - y_calc) / yerr)**2.) return 1 - SS_res / SS_tot
[docs] def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser() parser.description =\ '1D or 2D plot of source parameters from a sqlite parameter file' parser.add_argument('sqlite_file') valid_plot_types_str = str(valid_plot_types)[1:-1].replace("'", "\"") parser.add_argument( '-p', '--plot_type', default='fc_mw', help=f'Plot type. One of: {valid_plot_types_str}. ' '1D plots are histograms. ' '2D plots are scatter plots or 2D histograms, ' 'depending on the -H option. ' 'Default is "fc_mw"' ) parser.add_argument( '-r', '--runid', default=None, help='Only select a specific runid. Default is all') parser.add_argument( '-s', '--statistics', default='mean', help='Statistics to use: "mean", "wmean" (weighted mean) ' 'or "pctl" (percentiles). Default is "mean"') parser.add_argument( '-i', '--nbins', type=int, default=None, help='Number of bins in the histogram (default: autoset)') parser.add_argument( '-n', '--stamin', type=int, default=None, help='Minimum number of stations') parser.add_argument( '-N', '--stamax', type=int, default=None, help='Maximum number of stations') parser.add_argument( '-m', '--magmin', type=float, default=None, help='Minimum magnitude') parser.add_argument( '-M', '--magmax', type=float, default=None, help='Maximum magnitude') parser.add_argument( '-d', '--ssdmin', type=float, default=None, help='Minimum static stress drop') parser.add_argument( '-D', '--ssdmax', type=float, default=None, help='Maximum static stress drop') parser.add_argument( '-a', '--sigmaamin', type=float, default=None, help='Minimum apparent stress drop') parser.add_argument( '-A', '--sigmaamax', type=float, default=None, help='Maximum apparent stress drop') parser.add_argument( '-H', '--hist', default=False, action='store_true', help='Draw an histogram instead of a scatter plot (only for 2D plots)') parser.add_argument( '-f', '--fit', default=False, action='store_true', help='Fit the scatter plot with a linear function (only for 2D plots)') parser.add_argument( '-l', '--slope', default=False, action='store_true', help='Fit also the slope of the linear function (only for 2D plots)') parser.add_argument( '-w', '--wave_type', default='S', help='Wave type. Only used for plots involving "fc". ' 'One of "P", "S", "SV" or "SH". Default is "S".') args = parser.parse_args() if args.plot_type not in valid_plot_types: msg = f'Plot type must be one of {valid_plot_types_str}' raise ValueError(msg) if args.statistics not in ['mean', 'wmean', 'pctl']: msg = 'Statistics must be "mean", "wmean" or "pctl"' raise ValueError(msg) if args.wave_type not in ['P', 'S', 'SV', 'SH']: msg = 'Wave type must be "P", "S", "SV" or "SH"' raise ValueError(msg) return args
[docs] def query_event_params_into_numpy(cursor, param, param_type, query_condition): """ Query a parameter from the Events table and return it as a numpy array. """ query = f'select {param} from Events {query_condition} order by evid,runid' cursor.execute(query) result = np.array(cursor.fetchall()) if len(result) == 0: raise ValueError('No events found') return result[:, 0].astype(param_type)
[docs] class Params(): """ Class to handle the parameters from a sqlite file. """ def __init__(self, args): """ Initialize the class from a sqlite file. """ self.args = args self.sqlite_file = args.sqlite_file self.runid = runid = args.runid self.stat = stat = args.statistics self._open_db(self.sqlite_file) query_condition = f'where runid="{runid}"' if runid is not None else '' self.evids = query_event_params_into_numpy( self.cur, 'evid', str, query_condition) self.vp = query_event_params_into_numpy( self.cur, 'vp', np.float64, query_condition) self.vs = query_event_params_into_numpy( self.cur, 'vs', np.float64, query_condition) self.rho = query_event_params_into_numpy( self.cur, 'rho', np.float64, query_condition) self.kp = query_event_params_into_numpy( self.cur, 'kp', np.float64, query_condition) self.ks = query_event_params_into_numpy( self.cur, 'ks', np.float64, query_condition) self.wave_type = query_event_params_into_numpy( self.cur, 'wave_type', str, query_condition) self.nsta = query_event_params_into_numpy( self.cur, 'nobs', np.int32, query_condition) self.Mo = query_event_params_into_numpy( self.cur, f'Mo_{stat}', np.float64, query_condition) self.mw = query_event_params_into_numpy( self.cur, f'Mw_{stat}', np.float64, query_condition) self.mw_err_minus = query_event_params_into_numpy( self.cur, f'Mw_{stat}_err_minus', np.float64, query_condition) self.mw_err_plus = query_event_params_into_numpy( self.cur, f'Mw_{stat}_err_plus', np.float64, query_condition) self.fc = query_event_params_into_numpy( self.cur, f'fc_{stat}', np.float64, query_condition) self.fc_err_minus = query_event_params_into_numpy( self.cur, f'fc_{stat}_err_minus', np.float64, query_condition) self.fc_err_plus = query_event_params_into_numpy( self.cur, f'fc_{stat}_err_plus', np.float64, query_condition) self.Er = query_event_params_into_numpy( self.cur, f'Er_{stat}', np.float64, query_condition) self.Er_err_minus = query_event_params_into_numpy( self.cur, f'Er_{stat}_err_minus', np.float64, query_condition) self.Er_err_plus = query_event_params_into_numpy( self.cur, f'Er_{stat}_err_plus', np.float64, query_condition) self.ssd = query_event_params_into_numpy( self.cur, f'ssd_{stat}', np.float64, query_condition) self.ssd_err_minus = query_event_params_into_numpy( self.cur, f'ssd_{stat}_err_minus', np.float64, query_condition) self.ssd_err_plus = query_event_params_into_numpy( self.cur, f'ssd_{stat}_err_plus', np.float64, query_condition) self.ra = query_event_params_into_numpy( self.cur, f'ra_{stat}', np.float64, query_condition) self.ra_err_minus = query_event_params_into_numpy( self.cur, f'ra_{stat}_err_minus', np.float64, query_condition) self.ra_err_plus = query_event_params_into_numpy( self.cur, f'ra_{stat}_err_plus', np.float64, query_condition) self.sigma_a = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}', np.float64, query_condition) self.sigma_a_err_minus = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}_err_minus', np.float64, query_condition) self.sigma_a_err_plus = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}_err_plus', np.float64, query_condition) self.t_star = query_event_params_into_numpy( self.cur, f't_star_{stat}', np.float64, query_condition) self.t_star_err_minus = query_event_params_into_numpy( self.cur, f't_star_{stat}_err_minus', np.float64, query_condition) self.t_star_err_plus = query_event_params_into_numpy( self.cur, f't_star_{stat}_err_plus', np.float64, query_condition) self.Qo = query_event_params_into_numpy( self.cur, f'Qo_{stat}', np.float64, query_condition) self.Qo_err_minus = query_event_params_into_numpy( self.cur, f'Qo_{stat}_err_minus', np.float64, query_condition) self.Qo_err_plus = query_event_params_into_numpy( self.cur, f'Qo_{stat}_err_plus', np.float64, query_condition) self.cur.close() # other attributes self.nbins_x = None self.nbins_y = None def _open_db(self, sqlite_file): """ Open the sqlite file and check that it contains the required tables. """ if not os.path.isfile(sqlite_file): raise FileNotFoundError(f'File "{sqlite_file}" not found') if os.stat(sqlite_file).st_size == 0: raise ValueError(f'File "{sqlite_file}" is empty') conn = sqlite3.connect(sqlite_file) cur = conn.cursor() try: cur.execute("SELECT name FROM sqlite_master WHERE type='table';") except sqlite3.DatabaseError as e: raise ValueError( f'File "{sqlite_file}" is not a valid sqlite file' ) from e tables = [t[0] for t in cur.fetchall()] for table in 'Events', 'Stations': if table not in tables: raise ValueError( f'Table "{table}" not found in file "{sqlite_file}"') self.cur = cur
[docs] def skip_events(self, idx): """Skip events with index idx.""" self.evids = np.delete(self.evids, idx) self.vp = np.delete(self.vp, idx) self.vs = np.delete(self.vs, idx) self.rho = np.delete(self.rho, idx) self.kp = np.delete(self.kp, idx) self.ks = np.delete(self.ks, idx) self.wave_type = np.delete(self.wave_type, idx) self.nsta = np.delete(self.nsta, idx) self.Mo = np.delete(self.Mo, idx) self.mw = np.delete(self.mw, idx) self.mw_err_minus = np.delete(self.mw_err_minus, idx) self.mw_err_plus = np.delete(self.mw_err_plus, idx) self.fc = np.delete(self.fc, idx) self.fc_err_minus = np.delete(self.fc_err_minus, idx) self.fc_err_plus = np.delete(self.fc_err_plus, idx) self.Er = np.delete(self.Er, idx) self.Er_err_minus = np.delete(self.Er_err_minus, idx) self.Er_err_plus = np.delete(self.Er_err_plus, idx) self.ssd = np.delete(self.ssd, idx) self.ssd_err_minus = np.delete(self.ssd_err_minus, idx) self.ssd_err_plus = np.delete(self.ssd_err_plus, idx) self.ra = np.delete(self.ra, idx) self.ra_err_minus = np.delete(self.ra_err_minus, idx) self.ra_err_plus = np.delete(self.ra_err_plus, idx) self.sigma_a = np.delete(self.sigma_a, idx) self.sigma_a_err_minus = np.delete(self.sigma_a_err_minus, idx) self.sigma_a_err_plus = np.delete(self.sigma_a_err_plus, idx) self.t_star = np.delete(self.t_star, idx) self.t_star_err_minus = np.delete(self.t_star_err_minus, idx) self.t_star_err_plus = np.delete(self.t_star_err_plus, idx) self.Qo = np.delete(self.Qo, idx) self.Qo_err_minus = np.delete(self.Qo_err_minus, idx) self.Qo_err_plus = np.delete(self.Qo_err_plus, idx)
[docs] def filter(self, stamin=None, stamax=None, magmin=None, magmax=None, ssdmin=None, ssdmax=None, sigmaamin=None, sigmaamax=None): """Filter the parameters based on one or more conditions.""" cond = np.ones(len(self.nsta)).astype(bool) if stamin is not None: cond = np.logical_and(cond, self.nsta >= stamin) if stamax is not None: cond = np.logical_and(cond, self.nsta <= stamax) if magmin is not None: cond = np.logical_and(cond, self.mw >= magmin) if magmax is not None: cond = np.logical_and(cond, self.mw <= magmax) if ssdmin is not None: cond = np.logical_and(cond, self.ssd >= ssdmin) if ssdmax is not None: cond = np.logical_and(cond, self.ssd <= ssdmax) if sigmaamin is not None: cond = np.logical_and(cond, self.sigma_a >= sigmaamin) if sigmaamax is not None: cond = np.logical_and(cond, self.sigma_a <= sigmaamax) self.skip_events(np.where(~cond)[0])
def _make_mw_axis(self): """Make the magnitude axis.""" if self.args.magmin is not None: mag_min = self.args.magmin else: mag_min = np.floor(np.min(self.mw - self.mw_err_minus)) if self.args.magmax is not None: mag_max = self.args.magmax else: mag_max = np.ceil(np.max(self.mw + self.mw_err_plus)) xlim_mag = (mag_min, mag_max) fig = plt.figure(figsize=(10, 6)) ax_Mo = fig.add_subplot(111) ax_Mo.set_xscale('log') ax_Mo.set_xlim(mag_to_moment(xlim_mag)) ax = ax_Mo.twiny() ax.set_xlim(xlim_mag) ax.set_xlabel('Mw') ax_Mo.set_xlabel('Mo (N·m)') return fig, ax, ax_Mo def _add_grid(self, ax): """Add grid to the plot.""" ax.grid( True, which='major', linestyle='solid', color='#999999', zorder=0) ax.grid( True, which='minor', linestyle='solid', color='#DDDDDD', zorder=0) def _set_plot_title(self, ax, nevs=None, extra_text=None): """Set the plot title.""" if nevs is None: nevs = len(self.evids) stat_descr = { 'mean': 'mean', 'wmean': 'weighted mean', 'pctl': 'percentiles', } title = f'{nevs} events' if None not in (self.nbins_x, self.nbins_y): title += f', {self.nbins_x}x{self.nbins_y} bins' title += f' - {stat_descr[self.stat]}' if self.runid is not None: title += f' - runid: {self.runid}' if extra_text is not None: title += f'\n{extra_text}' ax.set_title(title, y=0.95, verticalalignment='top') def _stress_drop_curves_fc_mw(self, vel, k_parameter, ax): """Plot stress-drop curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) fc_min = np.inf fc_max = 0. for delta_sigma in (0.1, 1., 10., 100.): fc_test = stress_drop_curve_fc_mw( delta_sigma, vel, mw_test, k_parameter) if fc_test.min() < fc_min: fc_min = fc_test.min() if fc_test.max() > fc_max: fc_max = fc_test.max() ax.plot(mw_test, fc_test, color='#555555') label = str(delta_sigma) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' ax.text(mw_test[-1], fc_test[-1], label) ax.set_ylim((fc_min * 0.9, fc_max * 1.1)) def _stress_drop_curves_Er_mw(self, mu, ax): """Plot stress-drop curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) Er_min = np.inf Er_max = 0. for delta_sigma in (0.1, 1., 10., 100.): Er_test = stress_drop_curve_Er_mw(delta_sigma, mu, mw_test) if Er_test.min() < Er_min: Er_min = Er_test.min() if Er_test.max() > Er_max: Er_max = Er_test.max() ax.plot(mw_test, Er_test, color='#555555') label = str(delta_sigma) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' ax.text(mw_test[-1], Er_test[-1], label) ax.set_ylim((Er_min * 0.5, Er_max * 2)) def _apparent_stress_curves_Er_mw(self, mu, ax): """Plot apparent stress curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) Er_min = np.inf Er_max = 0. for sigma_a in (0.1, 1., 10., 100.): Er_test = apparent_stress_curve_Er_mw(sigma_a, mu, mw_test) if Er_test.min() < Er_min: Er_min = Er_test.min() if Er_test.max() > Er_max: Er_max = Er_test.max() ax.plot(mw_test, Er_test, color='#555555') label = str(sigma_a) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' ax.text(mw_test[-1], Er_test[-1], label) ax.set_ylim((Er_min * 0.5, Er_max * 2)) def _2d_hist_fc_mw(self, fig, ax, nbins=None, wave_type='S'): """Plot a 2d histogram of fc vs mw.""" mw_min, mw_max = ax.get_xlim() fc_min, fc_max = ax.get_ylim() log_fc_min = np.log10(fc_min) log_fc_max = np.log10(fc_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) fc_nbins = int((log_fc_max - log_fc_min) * 10) else: mw_nbins = nbins fc_nbins = nbins self.nbins_x = mw_nbins self.nbins_y = fc_nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) fc_bins = 10**np.linspace(log_fc_min, log_fc_max + 0.1, fc_nbins) cond = self.wave_type == wave_type mw = self.mw[cond] if len(mw) == 0: raise ValueError(f'No events found for wave type "{wave_type}"') fc = self.fc[cond] counts, _, _ = np.histogram2d(mw, fc, bins=(mw_bins, fc_bins)) cm = ax.pcolormesh( mw_bins[:-1], fc_bins[:-1], counts.T, cmap='magma_r', shading='auto') cbaxes = fig.add_axes([0.15, 0.15, 0.02, 0.2]) plt.colorbar(cm, cax=cbaxes, orientation='vertical', label='counts') return len(fc) def _2d_hist_Er_mw(self, fig, ax, nbins=None): """Plot a 2d histogram of Er vs mw.""" mw_min, mw_max = ax.get_xlim() Er_min, Er_max = ax.get_ylim() log_Er_min = np.log10(Er_min) log_Er_max = np.log10(Er_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) Er_nbins = int((log_Er_max - log_Er_min) * 5) else: mw_nbins = nbins Er_nbins = nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) Er_bins = 10**np.linspace(log_Er_min, log_Er_max + 0.1, Er_nbins) self.nbins_x = mw_nbins self.nbins_y = Er_nbins # Er can be NaN cond = ~np.isnan(self.Er) mw = self.mw[cond] Er = self.Er[cond] counts, _, _ = np.histogram2d(mw, Er, bins=(mw_bins, Er_bins)) cm = ax.pcolormesh( mw_bins[:-1], Er_bins[:-1], counts.T, cmap='magma_r', shading='auto') cbaxes = fig.add_axes([0.15, 0.15, 0.02, 0.2]) plt.colorbar(cm, cax=cbaxes, orientation='vertical', label='counts') return len(Er) def _2d_hist_ssd_mw(self, fig, ax, nbins=None): """Plot a 2d histogram of ssd vs mw.""" mw_min, mw_max = ax.get_xlim() ssd_min, ssd_max = ax.get_ylim() log_ssd_min = np.log10(ssd_min) log_ssd_max = np.log10(ssd_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) ssd_nbins = int((log_ssd_max - log_ssd_min) * 5) else: mw_nbins = nbins ssd_nbins = nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) ssd_bins = 10**np.linspace(log_ssd_min, log_ssd_max + 0.1, ssd_nbins) self.nbins_x = mw_nbins self.nbins_y = ssd_nbins counts, _, _ = np.histogram2d( self.mw, self.ssd, bins=(mw_bins, ssd_bins)) cm = ax.pcolormesh( mw_bins[:-1], ssd_bins[:-1], counts.T, cmap='magma_r', shading='auto') cbaxes = fig.add_axes([0.15, 0.15, 0.02, 0.2]) plt.colorbar(cm, cax=cbaxes, orientation='vertical', label='counts') def _scatter_fc_mw(self, fig, ax, wave_type='S'): """Plot the scatter plot of fc vs mw.""" cond = self.wave_type == wave_type evids = self.evids[cond] if len(evids) == 0: raise ValueError(f'No events found for wave type "{wave_type}"') mw = self.mw[cond] mw_err_minus = self.mw_err_minus[cond] mw_err_plus = self.mw_err_plus[cond] fc = self.fc[cond] fc_err_minus = self.fc_err_minus[cond] fc_err_plus = self.fc_err_plus[cond] alpha = 1 ax.errorbar( mw, fc, xerr=[mw_err_minus, mw_err_plus], yerr=[fc_err_minus, fc_err_plus], fmt='o', mec='black', mfc='#FCBA25', ecolor='#FCBA25', alpha=alpha) ax.scatter(mw, fc, alpha=0, picker=True, zorder=20) yformat = 'fc {:.2f} Hz' annot = Annot(mw, fc, evids, yformat) fig.canvas.mpl_connect('pick_event', annot) return len(fc) def _scatter_Er_mw(self, fig, ax): """Plot the scatter plot of Er vs mw.""" # Er can be NaN cond = ~np.isnan(self.Er) mw = self.mw[cond] mw_err_minus = self.mw_err_minus[cond] mw_err_plus = self.mw_err_plus[cond] Er = self.Er[cond] Er_err_minus = self.Er_err_minus[cond] Er_err_plus = self.Er_err_plus[cond] evids = self.evids[cond] alpha = 1 ax.errorbar( mw, Er, xerr=[mw_err_minus, mw_err_plus], yerr=[Er_err_minus, Er_err_plus], fmt='o', mec='black', mfc='#FCBA25', ecolor='#FCBA25', alpha=alpha) ax.scatter(mw, Er, alpha=0, picker=True, zorder=20) yformat = 'Er {:.1e} N·m' annot = Annot(mw, Er, evids, yformat) fig.canvas.mpl_connect('pick_event', annot) return len(Er) def _scatter_ssd_mw(self, fig, ax): """Plot the scatter plot of ssd vs mw.""" alpha = 1 ax.errorbar( self.mw, self.ssd, xerr=[self.mw_err_minus, self.mw_err_plus], yerr=[self.ssd_err_minus, self.ssd_err_plus], fmt='o', mec='black', mfc='#FCBA25', ecolor='#FCBA25', alpha=alpha) ax.scatter(self.mw, self.ssd, alpha=0, picker=True, zorder=20) yformat = 'ssd {:.2e} MPa' annot = Annot(self.mw, self.ssd, self.evids, yformat) fig.canvas.mpl_connect('pick_event', annot) def _fit_fc_mw(self, vel, k_parameter, ax, slope=False): """Plot a linear regression of fc vs mw.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) if slope: f = fc_mw_function else: def f(mw, a): return fc_mw_function(mw, a, -0.5) y = np.log10(self.fc) # pylint: disable=unbalanced-tuple-unpacking popt, _pcov = curve_fit(f, self.mw, y) try: a, b = popt except Exception: a, = popt b = -0.5 print(f'a: {a:.1f} b {b:.1f}:') slope = (3 / 2) / b print(f'slope: {slope:.1f}') r2 = calc_r2(self.mw, y, np.ones_like(y), a, b) print('r2:', r2) # print('r2_err:', r2_err) delta_sigma = 1. / ((vel * 1000.)**3.) * 10**(a + 9.1 + 0.935) delta_sigma /= 1e6 print('delta_sigma', delta_sigma) fc_test = stress_drop_curve_fc_mw( delta_sigma, vel, mw_test, k_parameter, b=b) ax.plot( mw_test, fc_test, color='red', zorder=10, label=f'r2: {r2:.2f}') ax.legend() if not slope: ax.text( mw_test[-1], fc_test[-1], f'{delta_sigma:.1f} MPa', color='red', backgroundcolor='white', zorder=50)
[docs] def plot_fc_mw(self, hist=False, fit=False, slope=False, nbins=None, wave_type='S'): """ Plot the logarithm of the corner frequency vs the moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of fc vs mw. slope : bool If True, also fit the slope of the linear regression. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') if wave_type == 'P': vel = np.nanmean(self.vp) k = np.nanmean(self.kp) elif wave_type in ('S', 'SV', 'SH'): vel = np.nanmean(self.vs) k = np.nanmean(self.ks) else: raise ValueError('Wave type must be "P", "S", "SV" or "SH"') self._stress_drop_curves_fc_mw(vel, k, ax) if hist: npoints = self._2d_hist_fc_mw(fig, ax, nbins, wave_type) else: npoints = self._scatter_fc_mw(fig, ax, wave_type) if fit: self._fit_fc_mw(vel, k, ax, slope=slope) extra_text = 'Stress drop curves' self._set_plot_title(ax, npoints, extra_text) self._add_grid(ax_Mo) ax_Mo.set_ylabel('fc (Hz)') plt.show()
[docs] def plot_Er_mw(self, hist=False, fit=False, nbins=None): """ Plot the logarithm of the radiated energy vs the moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of Er vs mw. slope : bool If True, also fit the slope of the linear regression. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') mu = np.nanmean((self.vs * 1e3)**2 * self.rho) self._apparent_stress_curves_Er_mw(mu, ax) if hist: npoints = self._2d_hist_Er_mw(fig, ax, nbins) else: npoints = self._scatter_Er_mw(fig, ax) if fit: raise NotImplementedError('Fit not implemented yet for Er_mw') extra_text = 'Apparent stress curves' self._set_plot_title(ax, npoints, extra_text) self._add_grid(ax_Mo) ax_Mo.set_ylabel('Er (N·m)') plt.show()
[docs] def plot_ssd_mw(self, hist=False, fit=False, nbins=None): """ Plot the logarithm of static stress drop vs moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of ssd vs mw. slope : bool If True, also fit the slope of the linear regression. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') ssd_min = np.min(self.ssd - self.ssd_err_minus) ssd_max = np.max(self.ssd + self.ssd_err_plus) ssd_min = 10**(np.floor(np.log10(ssd_min))) ssd_max = 10**(np.ceil(np.log10(ssd_max))) ax.set_ylim(ssd_min, ssd_max) if hist: self._2d_hist_ssd_mw(fig, ax, nbins) else: self._scatter_ssd_mw(fig, ax) if fit: raise NotImplementedError('Fit not implemented yet for ssd_mw') self._set_plot_title(ax) self._add_grid(ax_Mo) ax_Mo.set_ylabel('ssd (MPa)') plt.show()
[docs] def plot_hist(self, param_name, nbins=None, wave_type='S'): """Plot a histogram of the given parameter.""" parameters = { 'fc': ('Corner Frequency', 'Hz', 'log'), 'ssd': ('Static Stress Drop', 'MPa', 'log'), 'ra': ('Source Radius', 'm', 'lin'), 'Mo': ('Seismic Moment', 'N·m', 'log'), 't_star': ('T star', 's', 'lin'), 'Qo': ('Qo', None, 'lin'), 'Er': ('Radiated Energy', 'N·m', 'log'), 'sigma_a': ('Apparent Stress', 'MPa', 'log'), } description, unit, log = parameters[param_name] log = log == 'log' values = getattr(self, param_name) if param_name == 'fc': values = values[self.wave_type == wave_type] if len(values) == 0: raise ValueError( f'No events found for wave type "{wave_type}"') values = values[~np.isnan(values)] nvalues = len(values) if nvalues < 2: raise ValueError(f'Not enough values to plot histogram: {nvalues}') if nbins is None: # find the number of bins as the closest power of 10 to the # number of values divided by 10 nbins_exp = int(np.ceil(np.log10(nvalues / 10))) nbins = int(10**nbins_exp) # reduce the number of bins if there are too few values per bin if nvalues / nbins < 2: nbins //= 2 if log: values_mean = 10**(np.mean(np.log10(values))) min_exp = np.floor(np.min(np.log10(values))) max_exp = np.ceil(np.max(np.log10(values))) bins = np.logspace(min_exp, max_exp, nbins + 1) else: values_mean = np.mean(values) maxval = np.max(np.abs(values)) bins = np.linspace(0, maxval, nbins) _fig, ax = plt.subplots() ax.hist(values, bins=bins) ax.axvline(values_mean, color='red') txt = ( f' mean: {values_mean:.1e}' if log else f' mean: {values_mean:.4f}' ) if unit is not None: txt += f' {unit}' ax.text( values_mean, 0.95, txt, color='red', transform=ax.get_xaxis_transform()) if log: ax.set_xscale('log') if unit is not None: ax.set_xlabel(f'{description} ({unit})') else: ax.set_xlabel(f'{description}') ax.set_ylabel('Counts') title = f'{nvalues} values, {nbins} bins - {self.stat}' if self.runid is not None: title += f' - runid {self.runid}' ax.set_title(title) plt.show()
[docs] def run(): """Run the script.""" args = parse_args() params = Params(args) if os.path.exists('problems.txt'): _skip_events(params) params.filter( stamin=args.stamin, stamax=args.stamax, magmin=args.magmin, magmax=args.magmax, ssdmin=args.ssdmin, ssdmax=args.ssdmax, sigmaamin=args.sigmaamin, sigmaamax=args.sigmaamax, ) if len(params.evids) == 0: raise ValueError('No events found') if args.plot_type == 'fc_mw': params.plot_fc_mw( args.hist, args.fit, args.slope, args.nbins, args.wave_type) elif args.plot_type == 'Er_mw': params.plot_Er_mw(args.hist, args.fit, args.nbins) elif args.plot_type == 'ssd_mw': params.plot_ssd_mw(args.hist, args.fit, args.nbins) elif args.plot_type in valid_plot_types: params.plot_hist(args.plot_type, args.nbins, args.wave_type)
def _skip_events(params): evid_skip = np.loadtxt('problems.txt', usecols=(0,), dtype=str) # case in which there is only one evid: if not evid_skip.shape: evid_skip = evid_skip[None] skip_idx = [] for evid in evid_skip: with contextlib.suppress(Exception): skip_idx.append(np.where(params.evids == evid)[0][0]) skipped_evid_str = ' '.join(params.evids[skip_idx]) print(f'Skipping events: {skipped_evid_str}') params.skip_events(skip_idx)
[docs] def main(): """Main function.""" try: run() except KeyboardInterrupt: sys.exit(0) except Exception as msg: sys.exit(msg)
if __name__ == '__main__': main()