Source code for plot_sourcepars

# -*- coding: utf8 -*-
# SPDX-License-Identifier: CECILL-2.1
"""
plot_sourcepars.py

1D or 2D plot of source parameters from a sqlite parameter file.

:copyright:
    2023-2026 Claudio Satriano <satriano@ipgp.fr>
:license:
    CeCILL Free Software License Agreement v2.1
    (http://www.cecill.info/licences.en.html)
"""
import argparse
import contextlib
import itertools
import sys
import os
import sqlite3
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patheffects as patheffects
import scipy.stats as stats
from scipy.optimize import curve_fit
from obspy import UTCDateTime

valid_plot_types = [
    'fc', 'Er', 'ssd', 'ra', 'Mo', 't_star', 'Qo', 'sigma_a',
    'fc_mw', 'Er_mw', 'ssd_mw', 'ssd_depth', 'mw_time', 'GR'
]
# the parameter base color is the same as in ssp_plot_params_stats.py
valid_parameters = {
    'depth': ('Depth', 'km', 'lin', '#1E90FF'),
    'nsta': ('Number of Stations', None, 'lin', '#F0C300'),
    'Mo': ('Seismic Moment', 'N·m', 'log', '#EE5835'),
    'Mw': ('Moment Magnitude', None, 'lin', '#EE5835'),
    'fc': ('Corner Frequency', 'Hz', 'log', '#6FBA6C'),
    'ssd': ('Static Stress Drop', 'MPa', 'log', '#D4ADD2'),
    'ra': ('Source Radius', 'm', 'lin', '#FAAC64'),
    't_star': ('T star', 's', 'lin', '#9EBAE2'),
    'Qo': ('Qo', None, 'lin', '#C07131'),
    'Er': ('Radiated Energy', 'N·m', 'log', '#00E3E9'),
    'sigma_a': ('Apparent Stress', 'MPa', 'log', '#943B99'),
}


[docs] def get_param_label(parameter): """ Get the label for the parameter. """ p_name, p_units = valid_parameters[parameter][:2] return f'{p_name} ({p_units})' if p_units is not None else p_name
[docs] def get_colormap(parameter): """ Get a colormap for the parameter. """ _, _, scale, base_color = valid_parameters[parameter] cmap = plt.cm.colors.LinearSegmentedColormap.from_list( 'custom', ['black', base_color, 'white']) return cmap, scale
[docs] class Annot(): """ Annotate the plot with the evid, Mw and the value of the parameter. """ def __init__(self, xdata, ydata, labels, yformat, xformat=None): self.xdata = xdata self.ydata = ydata self.labels = labels self.yformat = yformat self.xformat = xformat def __call__(self, event): ind = event.ind x = np.take(self.xdata, ind) y = np.take(self.ydata, ind) label = np.take(self.labels, ind) for xvalue, yvalue, evid in zip(x, y, label): xstring = ( self.xformat.format(xvalue) if self.xformat is not None else f'Mw {xvalue:.1f}' ) ystring = self.yformat.format(yvalue) print(f'{evid} {xstring} {ystring}')
[docs] def mag_to_moment(mag, b=0.5): """ Convert magnitude to moment. The parameter b is used to change the slope of the fc-Mw stress drop curve. The standard value of b is 0.5, which corresponds to a self-similar model. """ mag = np.asarray(mag) return np.power(10, (3 * b * mag + 9.1))
[docs] def moment_to_mag(moment, b=0.5): """ Convert moment to magnitude. The parameter b is used to change the slope of the fc-Mw stress drop curve. The standard value of b is 0.5, which corresponds to a self-similar model. """ moment = np.asarray(moment) return (np.log10(moment) - 9.1) / (3 * b)
[docs] def stress_drop_curve_fc_mw(delta_sigma, vel, mw, k=0.3724, b=-0.5): """ Constant stress drop curve in fc vs Mw. Obtained by combining the equation for stress drop: delta_sigma = 7/16 * Mo / a^3 with the equation for source radius: a = k * vel * (delta_sigma / fc) where k is a coefficient discussed in Kaneko and Shearer (2014). For the Brune source model, k=0.3724. Parameters ---------- delta_sigma : float Stress drop in MPa. vel : float P or S-wave velocity in km/s. mw : float Moment magnitude. b : float, optional The slope of the stress drop curve. Default is -0.5. k : float, optional Coefficient for the source radius. Default is 0.3724 (Brune source model). Returns ------- fc : float Corner frequency in Hz. """ vel *= 1e3 # P or S-wave velocity in m/s delta_sigma *= 1e6 # stress drop in Pa # compute moment from magnitude, allowing for non-self-similarity through # the b parameter moment = mag_to_moment(mw, -b) # return fc in Hz return k*vel*(16/7 * delta_sigma / moment)**(1/3)
[docs] def stress_drop_curve_Er_mw(delta_sigma, mu, mw): """ Constant stress drop curve in Er vs Mw. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 33., page 374. """ # Eq. (33) of Madariaga (2009): # 0.2331 * delta_sigma = mu * Er / Mo Mo = mag_to_moment(mw) delta_sigma *= 1e6 # return Er in N·m return 0.2331 * delta_sigma * Mo / mu
[docs] def apparent_stress_curve_Er_mw(sigma_a, mu, mw): """ Constant apparent stress curve in Er vs Mw. Madariaga (2009), doi:10.1007/978-1-4419-7695-6_22, eq. 33., page 374. """ # Eq. (33) of Madariaga (2009): # sigma_a = mu * Er / Mo Mo = mag_to_moment(mw) sigma_a *= 1e6 # return Er in N·m return sigma_a * Mo / mu
[docs] def fc_mw_function(mw, a, b): """Function to fit fc vs Mw.""" return a / 3. + b * mw
[docs] def calc_r2(x, y, yerr, a, b): """Coefficient of determination.""" y_mean = np.mean(y) y_calc = fc_mw_function(x, a, b) SS_tot = np.sum(((y - y_mean) / yerr)**2.) SS_res = np.sum(((y - y_calc) / yerr)**2.) return 1 - SS_res / SS_tot
[docs] def compute_mc(magnitudes, magn_bin, vector_compl, pval): """ Compute the magnitude of completeness of a seismic catalog using the method in Taroni 2023 - TSR, https://doi.org/10.1785/0320230017. This code is translated from the original MATLAB code provided by Matteo Taroni. Parameters ---------- magnitudes : np.ndarray Vector of magnitudes. magn_bin : float Binning of the magnitudes (e.g., 0.01 or 0.1). vector_compl : np.ndarray Vector of completeness values to analyze. pval : float P-value threshold for the complete part of the catalog (e.g., 0.1). Returns ------- mc : float The magnitude of completeness of the catalog. """ # Preallocate P-value matrix pvalue_unif = np.zeros((100, len(vector_compl))) # CDF of uniform distribution cdf_unif = stats.uniform() # Loop over different completeness values for i, j in itertools.product(range(len(vector_compl)), range(100)): # Add uniform error to magnitudes magn = magnitudes + (np.random.rand(len(magnitudes)) - 0.5) * magn_bin # Select magnitudes above completeness threshold magn_ok = magn[magn >= vector_compl[i]] - vector_compl[i] # Transformation from exponential to uniform random variables if len(magn_ok) < 2: continue transf = magn_ok[:-1:2] / (magn_ok[:-1:2] + magn_ok[1::2]) # Kolmogorov-Smirnov test for uniformity _, pvalue_unif[j, i] = stats.kstest(transf, cdf_unif.cdf) p_mean = np.mean(pvalue_unif, axis=0) # Find the magnitude of completeness (first mean p-value >= Pval) mc_candidates = vector_compl[p_mean >= pval] return np.min(mc_candidates) if mc_candidates.size > 0 else None
[docs] def fit_gutenberg_richter(bin_centers, cum_nevs, mc): """ Fit the Gutenberg-Richter law to the data points. Parameters ---------- bin_centers : np.ndarray Bin centers. cum_nevs : np.ndarray Cumulative number of events. mc : float Magnitude of completeness. Returns ------- a, b : float G-R law parameters. """ def gr_law(mw, a, b): return a - b * mw # discard bins with no events log_cum_nevs = np.log10(cum_nevs[cum_nevs > 0]) bin_centers_fit = bin_centers[cum_nevs > 0] # discard bins below the magnitude of completeness log_cum_nevs_fit = log_cum_nevs[bin_centers_fit >= mc] bin_centers_fit = bin_centers_fit[bin_centers_fit >= mc] # with full_output = False, always returns a 2-tuple # pylint: disable-next=unbalanced-tuple-unpacking popt, _ = curve_fit( gr_law, bin_centers_fit, log_cum_nevs_fit, full_output=False ) a, b = popt # discard bins which are too far away from the fit and re-fit while True: residuals = log_cum_nevs_fit - gr_law(bin_centers_fit, a, b) std_res = np.std(residuals) cond = np.abs(residuals) < 2 * std_res if np.all(cond): break log_cum_nevs_fit = log_cum_nevs_fit[cond] bin_centers_fit = bin_centers_fit[cond] # pylint: disable-next=unbalanced-tuple-unpacking popt, _ = curve_fit( gr_law, bin_centers_fit, log_cum_nevs_fit, p0=(a, b), full_output=False ) a, b = popt return a, b
[docs] def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser() parser.description =\ '1D or 2D plot of source parameters from a sqlite parameter file' parser.add_argument('sqlite_file') valid_plot_types_str = str(valid_plot_types)[1:-1].replace("'", "\"") parser.add_argument( '-p', '--plot_type', default='fc_mw', help=f'Plot type. One of: {valid_plot_types_str}. ' '1D plots are histograms. ' '2D plots are scatter plots or 2D histograms, ' 'depending on the -H option. ' 'Default is "fc_mw"' ) valid_parameters_str = str(list( valid_parameters.keys()))[1:-1].replace("'", "\"") parser.add_argument( '-c', '--colorby', default=None, help='Color the data points by this parameter. ' f'One of: {valid_parameters_str}. ' 'Default is None') parser.add_argument( '-C', '--colormap', default=None, help='Force using a specific colormap instead of the default one for ' 'the colorby parameter. Any Matplotlib colormap is accepted. ') parser.add_argument( '-r', '--runid', default=None, help='Only select a specific runid. Default is all. Use "latest" to ' 'select the latest runid for each event') parser.add_argument( '-s', '--statistics', default='mean', help='Statistics to use: "mean", "wmean" (weighted mean) ' 'or "pctl" (percentiles). Default is "mean"') parser.add_argument( '-i', '--nbins', type=int, default=None, help='Number of bins in the histogram (default: autoset)') parser.add_argument( '-n', '--stamin', type=int, default=None, help='Minimum number of stations') parser.add_argument( '-N', '--stamax', type=int, default=None, help='Maximum number of stations') parser.add_argument( '-m', '--magmin', type=float, default=None, help='Minimum magnitude') parser.add_argument( '-M', '--magmax', type=float, default=None, help='Maximum magnitude') parser.add_argument( '-d', '--ssdmin', type=float, default=None, help='Minimum static stress drop') parser.add_argument( '-D', '--ssdmax', type=float, default=None, help='Maximum static stress drop') parser.add_argument( '-a', '--sigmaamin', type=float, default=None, help='Minimum apparent stress drop') parser.add_argument( '-A', '--sigmaamax', type=float, default=None, help='Maximum apparent stress drop') parser.add_argument( '-H', '--hist', default=False, action='store_true', help='Draw an histogram instead of a scatter plot (only for 2D plots)') parser.add_argument( '-f', '--fit', default=False, action='store_true', help='Fit the scatter plot with a linear function (only for 2D plots)') parser.add_argument( '-l', '--slope', default=False, action='store_true', help='Fit also the slope of the linear function (only for 2D plots)') parser.add_argument( '-w', '--wave_type', default='S', help='Wave type. Only used for plots involving "fc". ' 'One of "P", "S", "SV" or "SH". Default is "S".') args = parser.parse_args() if args.plot_type not in valid_plot_types: msg = f'Plot type must be one of {valid_plot_types_str}' raise ValueError(msg) if args.colorby is not None and args.colorby not in valid_parameters: msg = f'Colorby parameter must be one of {valid_parameters_str}' raise ValueError(msg) if args.statistics not in ['mean', 'wmean', 'pctl']: msg = 'Statistics must be "mean", "wmean" or "pctl"' raise ValueError(msg) if args.wave_type not in ['P', 'S', 'SV', 'SH']: msg = 'Wave type must be "P", "S", "SV" or "SH"' raise ValueError(msg) return args
[docs] def query_event_params_into_numpy(cursor, param, param_type, query_condition): """ Query a parameter from the Events table and return it as a numpy array. """ query = ( f'SELECT e.{param} FROM Events e {query_condition} ' 'ORDER BY e.evid, e.runid' ) cursor.execute(query) result = np.array(cursor.fetchall()) if len(result) == 0: raise ValueError('No events found') if param_type == UTCDateTime: return np.array([UTCDateTime(t) for t in result[:, 0]]) return result[:, 0].astype(param_type)
[docs] class Params(): """ Class to handle the parameters from a sqlite file. """ def __init__(self, args): """ Initialize the class from a sqlite file. """ self.args = args self.sqlite_file = args.sqlite_file self.runid = runid = args.runid self.stat = stat = args.statistics self._open_db(self.sqlite_file) if runid == 'latest': # select the latest runid for each event query_condition = """ JOIN ( SELECT evid, MAX(runid) AS max_runid FROM events GROUP BY evid ) AS max_runids ON e.evid = max_runids.evid AND e.runid = max_runids.max_runid """ elif runid is not None: query_condition = f'WHERE runid="{runid}"' else: query_condition = '' self.evids = query_event_params_into_numpy( self.cur, 'evid', str, query_condition) self.time = query_event_params_into_numpy( self.cur, 'orig_time', UTCDateTime, query_condition) self.depth = query_event_params_into_numpy( self.cur, 'depth', np.float64, query_condition) self.vp = query_event_params_into_numpy( self.cur, 'vp', np.float64, query_condition) self.vs = query_event_params_into_numpy( self.cur, 'vs', np.float64, query_condition) self.rho = query_event_params_into_numpy( self.cur, 'rho', np.float64, query_condition) self.kp = query_event_params_into_numpy( self.cur, 'kp', np.float64, query_condition) self.ks = query_event_params_into_numpy( self.cur, 'ks', np.float64, query_condition) self.wave_type = query_event_params_into_numpy( self.cur, 'wave_type', str, query_condition) self.nsta = query_event_params_into_numpy( self.cur, 'nobs', np.int32, query_condition) self.Mo = query_event_params_into_numpy( self.cur, f'Mo_{stat}', np.float64, query_condition) self.Mw = query_event_params_into_numpy( self.cur, f'Mw_{stat}', np.float64, query_condition) self.Mw_err_minus = query_event_params_into_numpy( self.cur, f'Mw_{stat}_err_minus', np.float64, query_condition) self.Mw_err_plus = query_event_params_into_numpy( self.cur, f'Mw_{stat}_err_plus', np.float64, query_condition) self.fc = query_event_params_into_numpy( self.cur, f'fc_{stat}', np.float64, query_condition) self.fc_err_minus = query_event_params_into_numpy( self.cur, f'fc_{stat}_err_minus', np.float64, query_condition) self.fc_err_plus = query_event_params_into_numpy( self.cur, f'fc_{stat}_err_plus', np.float64, query_condition) self.Er = query_event_params_into_numpy( self.cur, f'Er_{stat}', np.float64, query_condition) self.Er_err_minus = query_event_params_into_numpy( self.cur, f'Er_{stat}_err_minus', np.float64, query_condition) self.Er_err_plus = query_event_params_into_numpy( self.cur, f'Er_{stat}_err_plus', np.float64, query_condition) self.ssd = query_event_params_into_numpy( self.cur, f'ssd_{stat}', np.float64, query_condition) self.ssd_err_minus = query_event_params_into_numpy( self.cur, f'ssd_{stat}_err_minus', np.float64, query_condition) self.ssd_err_plus = query_event_params_into_numpy( self.cur, f'ssd_{stat}_err_plus', np.float64, query_condition) self.ra = query_event_params_into_numpy( self.cur, f'ra_{stat}', np.float64, query_condition) self.ra_err_minus = query_event_params_into_numpy( self.cur, f'ra_{stat}_err_minus', np.float64, query_condition) self.ra_err_plus = query_event_params_into_numpy( self.cur, f'ra_{stat}_err_plus', np.float64, query_condition) self.sigma_a = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}', np.float64, query_condition) self.sigma_a_err_minus = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}_err_minus', np.float64, query_condition) self.sigma_a_err_plus = query_event_params_into_numpy( self.cur, f'sigma_a_{stat}_err_plus', np.float64, query_condition) self.t_star = query_event_params_into_numpy( self.cur, f't_star_{stat}', np.float64, query_condition) self.t_star_err_minus = query_event_params_into_numpy( self.cur, f't_star_{stat}_err_minus', np.float64, query_condition) self.t_star_err_plus = query_event_params_into_numpy( self.cur, f't_star_{stat}_err_plus', np.float64, query_condition) self.Qo = query_event_params_into_numpy( self.cur, f'Qo_{stat}', np.float64, query_condition) self.Qo_err_minus = query_event_params_into_numpy( self.cur, f'Qo_{stat}_err_minus', np.float64, query_condition) self.Qo_err_plus = query_event_params_into_numpy( self.cur, f'Qo_{stat}_err_plus', np.float64, query_condition) self.cur.close() # other attributes self.nbins_x = None self.nbins_y = None def _open_db(self, sqlite_file): """ Open the sqlite file and check that it contains the required tables. """ if not os.path.isfile(sqlite_file): raise FileNotFoundError(f'File "{sqlite_file}" not found') if os.stat(sqlite_file).st_size == 0: raise ValueError(f'File "{sqlite_file}" is empty') conn = sqlite3.connect(sqlite_file) cur = conn.cursor() try: cur.execute("SELECT name FROM sqlite_master WHERE type='table';") except sqlite3.DatabaseError as e: raise ValueError( f'File "{sqlite_file}" is not a valid sqlite file' ) from e tables = [t[0] for t in cur.fetchall()] for table in 'Events', 'Stations': if table not in tables: raise ValueError( f'Table "{table}" not found in file "{sqlite_file}"') self.cur = cur
[docs] def skip_events(self, idx): """Skip events with index idx.""" self.evids = np.delete(self.evids, idx) self.time = np.delete(self.time, idx) self.depth = np.delete(self.depth, idx) self.vp = np.delete(self.vp, idx) self.vs = np.delete(self.vs, idx) self.rho = np.delete(self.rho, idx) self.kp = np.delete(self.kp, idx) self.ks = np.delete(self.ks, idx) self.wave_type = np.delete(self.wave_type, idx) self.nsta = np.delete(self.nsta, idx) self.Mo = np.delete(self.Mo, idx) self.Mw = np.delete(self.Mw, idx) self.Mw_err_minus = np.delete(self.Mw_err_minus, idx) self.Mw_err_plus = np.delete(self.Mw_err_plus, idx) self.fc = np.delete(self.fc, idx) self.fc_err_minus = np.delete(self.fc_err_minus, idx) self.fc_err_plus = np.delete(self.fc_err_plus, idx) self.Er = np.delete(self.Er, idx) self.Er_err_minus = np.delete(self.Er_err_minus, idx) self.Er_err_plus = np.delete(self.Er_err_plus, idx) self.ssd = np.delete(self.ssd, idx) self.ssd_err_minus = np.delete(self.ssd_err_minus, idx) self.ssd_err_plus = np.delete(self.ssd_err_plus, idx) self.ra = np.delete(self.ra, idx) self.ra_err_minus = np.delete(self.ra_err_minus, idx) self.ra_err_plus = np.delete(self.ra_err_plus, idx) self.sigma_a = np.delete(self.sigma_a, idx) self.sigma_a_err_minus = np.delete(self.sigma_a_err_minus, idx) self.sigma_a_err_plus = np.delete(self.sigma_a_err_plus, idx) self.t_star = np.delete(self.t_star, idx) self.t_star_err_minus = np.delete(self.t_star_err_minus, idx) self.t_star_err_plus = np.delete(self.t_star_err_plus, idx) self.Qo = np.delete(self.Qo, idx) self.Qo_err_minus = np.delete(self.Qo_err_minus, idx) self.Qo_err_plus = np.delete(self.Qo_err_plus, idx)
[docs] def filter(self, stamin=None, stamax=None, magmin=None, magmax=None, ssdmin=None, ssdmax=None, sigmaamin=None, sigmaamax=None): """Filter the parameters based on one or more conditions.""" cond = np.ones(len(self.nsta)).astype(bool) if stamin is not None: cond = np.logical_and(cond, self.nsta >= stamin) if stamax is not None: cond = np.logical_and(cond, self.nsta <= stamax) if magmin is not None: cond = np.logical_and(cond, self.Mw >= magmin) if magmax is not None: cond = np.logical_and(cond, self.Mw <= magmax) if ssdmin is not None: cond = np.logical_and(cond, self.ssd >= ssdmin) if ssdmax is not None: cond = np.logical_and(cond, self.ssd <= ssdmax) if sigmaamin is not None: cond = np.logical_and(cond, self.sigma_a >= sigmaamin) if sigmaamax is not None: cond = np.logical_and(cond, self.sigma_a <= sigmaamax) self.skip_events(np.where(~cond)[0])
def _make_mw_axis(self): """Make the magnitude axis.""" if self.args.magmin is not None: mag_min = self.args.magmin else: mag_min = np.floor(np.min(self.Mw - self.Mw_err_minus)) if self.args.magmax is not None: mag_max = self.args.magmax else: mag_max = np.ceil(np.max(self.Mw + self.Mw_err_plus)) xlim_mag = (mag_min, mag_max) fig = plt.figure(figsize=(10, 6)) ax_Mo = fig.add_subplot(111) ax_Mo.set_xscale('log') ax_Mo.set_xlim(mag_to_moment(xlim_mag)) ax = ax_Mo.twiny() ax.set_xlim(xlim_mag) ax.set_xlabel('Mw') ax_Mo.set_xlabel(get_param_label('Mo')) return fig, ax, ax_Mo def _add_grid(self, ax): """Add grid to the plot.""" ax.grid( True, which='major', linestyle='solid', color='#999999', zorder=0) ax.grid( True, which='minor', linestyle='solid', color='#DDDDDD', zorder=0) def _set_plot_title(self, ax, nevs=None, extra_text=None, align='center'): """Set the plot title.""" if nevs is None: nevs = len(self.evids) stat_descr = { 'mean': 'mean', 'wmean': 'weighted mean', 'pctl': 'percentiles', } title = f'{nevs} events' if None not in (self.nbins_x, self.nbins_y): title += f', {self.nbins_x}x{self.nbins_y} bins' title += f' - {stat_descr[self.stat]}' if self.runid is not None: title += f' - runid: {self.runid}' if extra_text is not None: title += f'\n{extra_text}' align_options = { 'center': (0.5, 0.95, 'center', 'top'), 'left': (0.05, 0.95, 'left', 'top'), 'right': (0.95, 0.95, 'right', 'top') } try: xpos, ypos, ha, va = align_options[align] except KeyError as e: raise ValueError('Invalid align') from e bbox = { 'facecolor': 'white', 'edgecolor': 'black', 'boxstyle': 'round', 'alpha': 0.7 } ax.set_title( title, x=xpos, y=ypos, horizontalalignment=ha, verticalalignment=va, bbox=bbox ) def _stress_drop_curves_fc_mw(self, vel, k_parameter, ax): """Plot stress-drop curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) fc_min = np.inf fc_max = 0. for delta_sigma in (0.1, 1., 10., 100.): fc_test = stress_drop_curve_fc_mw( delta_sigma, vel, mw_test, k_parameter) if fc_test.min() < fc_min: fc_min = fc_test.min() if fc_test.max() > fc_max: fc_max = fc_test.max() ax.plot(mw_test, fc_test, color='#555555') label = str(delta_sigma) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' outline = patheffects.withStroke(linewidth=2, foreground='white') ax.text(mw_test[-1], fc_test[-1], label, path_effects=[outline]) ax.set_ylim((fc_min * 0.9, fc_max * 1.1)) def _stress_drop_curves_Er_mw(self, mu, ax): """Plot stress-drop curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) Er_min = np.inf Er_max = 0. for delta_sigma in (0.1, 1., 10., 100.): Er_test = stress_drop_curve_Er_mw(delta_sigma, mu, mw_test) if Er_test.min() < Er_min: Er_min = Er_test.min() if Er_test.max() > Er_max: Er_max = Er_test.max() ax.plot(mw_test, Er_test, color='#555555') label = str(delta_sigma) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' outline = patheffects.withStroke(linewidth=2, foreground='white') ax.text(mw_test[-1], Er_test[-1], label, path_effects=[outline]) ax.set_ylim((Er_min * 0.5, Er_max * 2)) def _apparent_stress_curves_Er_mw(self, mu, ax): """Plot apparent stress curves for different delta_sigma.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) Er_min = np.inf Er_max = 0. for sigma_a in (0.1, 1., 10., 100.): Er_test = apparent_stress_curve_Er_mw(sigma_a, mu, mw_test) if Er_test.min() < Er_min: Er_min = Er_test.min() if Er_test.max() > Er_max: Er_max = Er_test.max() ax.plot(mw_test, Er_test, color='#555555') label = str(sigma_a) # Get rid of ".0" in label if label.endswith('.0'): label = label[:-2] label += ' MPa' outline = patheffects.withStroke(linewidth=2, foreground='white') ax.text(mw_test[-1], Er_test[-1], label, path_effects=[outline]) ax.set_ylim((Er_min * 0.5, Er_max * 2)) def _2d_hist(self, fig, ax, x, y, x_bins, y_bins, cbaxes_location): """General method to plot 2d histograms.""" counts, _, _ = np.histogram2d(x, y, bins=(x_bins, y_bins)) cm = ax.pcolormesh( x_bins[:-1], y_bins[:-1], counts.T, cmap='magma_r', shading='auto') if cbaxes_location == 'left': cbaxes = fig.add_axes([0.15, 0.15, 0.02, 0.2]) elif cbaxes_location == 'right': cbaxes = fig.add_axes([0.80, 0.15, 0.02, 0.2]) else: raise ValueError('Invalid cbaxes_location') plt.colorbar(cm, cax=cbaxes, orientation='vertical', label='counts') return len(y) def _2d_hist_fc_mw(self, fig, ax, nbins=None, wave_type='S'): """Plot a 2d histogram of fc vs mw.""" mw_min, mw_max = ax.get_xlim() fc_min, fc_max = ax.get_ylim() log_fc_min = np.log10(fc_min) log_fc_max = np.log10(fc_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) fc_nbins = int((log_fc_max - log_fc_min) * 10) else: mw_nbins = nbins fc_nbins = nbins self.nbins_x = mw_nbins self.nbins_y = fc_nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) fc_bins = 10**np.linspace(log_fc_min, log_fc_max + 0.1, fc_nbins) cond = self.wave_type == wave_type mw = self.Mw[cond] if len(mw) == 0: raise ValueError(f'No events found for wave type "{wave_type}"') fc = self.fc[cond] return self._2d_hist( fig, ax, mw, fc, mw_bins, fc_bins, cbaxes_location='left') def _2d_hist_Er_mw(self, fig, ax, nbins=None): """Plot a 2d histogram of Er vs mw.""" mw_min, mw_max = ax.get_xlim() Er_min, Er_max = ax.get_ylim() log_Er_min = np.log10(Er_min) log_Er_max = np.log10(Er_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) Er_nbins = int((log_Er_max - log_Er_min) * 5) else: mw_nbins = nbins Er_nbins = nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) Er_bins = 10**np.linspace(log_Er_min, log_Er_max + 0.1, Er_nbins) self.nbins_x = mw_nbins self.nbins_y = Er_nbins cond = ~np.isnan(self.Er) mw = self.Mw[cond] Er = self.Er[cond] return self._2d_hist( fig, ax, mw, Er, mw_bins, Er_bins, cbaxes_location='right') def _2d_hist_ssd_mw(self, fig, ax, nbins=None): """Plot a 2d histogram of ssd vs mw.""" mw_min, mw_max = ax.get_xlim() ssd_min, ssd_max = ax.get_ylim() log_ssd_min = np.log10(ssd_min) log_ssd_max = np.log10(ssd_max) if nbins is None: mw_nbins = int((mw_max - mw_min) * 10) ssd_nbins = int((log_ssd_max - log_ssd_min) * 5) else: mw_nbins = nbins ssd_nbins = nbins mw_bins = np.linspace(mw_min, mw_max + 0.1, mw_nbins) ssd_bins = 10**np.linspace(log_ssd_min, log_ssd_max + 0.1, ssd_nbins) self.nbins_x = mw_nbins self.nbins_y = ssd_nbins cond = ~np.isnan(self.ssd) mw = self.Mw[cond] ssd = self.ssd[cond] return self._2d_hist( fig, ax, mw, ssd, mw_bins, ssd_bins, cbaxes_location='left') def _2d_hist_ssd_depth(self, fig, ax, nbins=None): """Plot a 2d histogram of ssd vs depth.""" depth_min, depth_max = ax.get_xlim() ssd_min, ssd_max = ax.get_ylim() log_ssd_min = np.log10(ssd_min) log_ssd_max = np.log10(ssd_max) if nbins is None: depth_nbins = int((depth_max - depth_min) * 5) ssd_nbins = int((log_ssd_max - log_ssd_min) * 5) else: depth_nbins = nbins ssd_nbins = nbins depth_bins = np.linspace(depth_min, depth_max + 0.1, depth_nbins) ssd_bins = 10**np.linspace(log_ssd_min, log_ssd_max + 0.1, ssd_nbins) self.nbins_x = depth_nbins self.nbins_y = ssd_nbins cond = ~np.isnan(self.ssd) depth = self.depth[cond] ssd = self.ssd[cond] return self._2d_hist( fig, ax, depth, ssd, depth_bins, ssd_bins, cbaxes_location='left') def _scatter_plot(self, fig, ax, x, y, x_err, y_err, evids, yformat, cond, colorby=None, colormap=None, cbaxes_location='left', xformat=None): """ General method to plot scatter plots with error bars and optional color coding. """ if colorby is not None: color = getattr(self, colorby)[cond] cmap, units = get_colormap(colorby) if colormap is not None: cmap = plt.get_cmap(colormap) label = get_param_label(colorby) norm = plt.cm.colors.LogNorm() if units == 'log' else None mfc = 'none' mec = 'none' ecolor = 'black' errorbar_alpha = 0.5 scatter_alpha = 1 else: color = 'none' cmap = None norm = None mfc = ecolor = '#FCBA25' mec = 'black' errorbar_alpha = 1 scatter_alpha = 0 ax.errorbar( x, y, xerr=x_err, yerr=y_err, fmt='o', mec=mec, mfc=mfc, ecolor=ecolor, alpha=errorbar_alpha) sc = ax.scatter( x, y, alpha=scatter_alpha, c=color, cmap=cmap, norm=norm, edgecolors='black', picker=True, zorder=20) if colorby is not None: if cbaxes_location == 'left': cbaxes = fig.add_axes([0.15, 0.15, 0.02, 0.2]) elif cbaxes_location == 'right': cbaxes = fig.add_axes([0.80, 0.15, 0.02, 0.2]) else: raise ValueError('Invalid cbaxes_location') if len(label) > 10: label = label.replace(' (', '\n(') cbaxes.set_zorder(1000) plt.colorbar(sc, cax=cbaxes, label=label) annot = Annot(x, y, evids, yformat, xformat) fig.canvas.mpl_connect('pick_event', annot) return len(y) def _scatter_fc_mw(self, fig, ax, wave_type='S', colorby=None, colormap=None): """Plot the scatter plot of fc vs mw.""" cond = self.wave_type == wave_type evids = self.evids[cond] if len(evids) == 0: raise ValueError(f'No events found for wave type "{wave_type}"') mw = self.Mw[cond] mw_err_minus = self.Mw_err_minus[cond] mw_err_plus = self.Mw_err_plus[cond] fc = self.fc[cond] fc_err_minus = self.fc_err_minus[cond] fc_err_plus = self.fc_err_plus[cond] yformat = 'fc {:.2f} Hz' return self._scatter_plot( fig, ax, mw, fc, [mw_err_minus, mw_err_plus], [fc_err_minus, fc_err_plus], evids, yformat, cond, colorby, colormap, cbaxes_location='left') def _scatter_Er_mw(self, fig, ax, colorby=None, colormap=None): """Plot the scatter plot of Er vs mw.""" cond = ~np.isnan(self.Er) mw = self.Mw[cond] mw_err_minus = self.Mw_err_minus[cond] mw_err_plus = self.Mw_err_plus[cond] Er = self.Er[cond] Er_err_minus = self.Er_err_minus[cond] Er_err_plus = self.Er_err_plus[cond] evids = self.evids[cond] yformat = 'Er {:.1e} N·m' return self._scatter_plot( fig, ax, mw, Er, [mw_err_minus, mw_err_plus], [Er_err_minus, Er_err_plus], evids, yformat, cond, colorby, colormap, cbaxes_location='right') def _scatter_ssd_mw(self, fig, ax, colorby=None, colormap=None): """Plot the scatter plot of ssd vs mw.""" cond = ~np.isnan(self.ssd) mw = self.Mw[cond] mw_err_minus = self.Mw_err_minus[cond] mw_err_plus = self.Mw_err_plus[cond] ssd = self.ssd[cond] ssd_err_minus = self.ssd_err_minus[cond] ssd_err_plus = self.ssd_err_plus[cond] evids = self.evids[cond] yformat = 'ssd {:.2e} MPa' return self._scatter_plot( fig, ax, mw, ssd, [mw_err_minus, mw_err_plus], [ssd_err_minus, ssd_err_plus], evids, yformat, cond, colorby, colormap) def _scatter_ssd_depth(self, fig, ax, colorby=None, colormap=None): """Plot the scatter plot of ssd vs depth.""" cond = ~np.isnan(self.ssd) depth = self.depth[cond] ssd = self.ssd[cond] ssd_err_minus = self.ssd_err_minus[cond] ssd_err_plus = self.ssd_err_plus[cond] evids = self.evids[cond] yformat = 'ssd {:.2e} MPa' return self._scatter_plot( fig, ax, depth, ssd, None, [ssd_err_minus, ssd_err_plus], evids, yformat, cond, colorby, colormap) def _fit_fc_mw(self, vel, k_parameter, ax, slope=False): """Plot a linear regression of fc vs mw.""" mag_min, mag_max = ax.get_xlim() mw_step = 0.1 mw_test = np.arange(mag_min, mag_max - 2 * mw_step, mw_step) if slope: f = fc_mw_function else: def f(mw, a): return fc_mw_function(mw, a, -0.5) y = np.log10(self.fc) # pylint: disable=unbalanced-tuple-unpacking popt, _pcov = curve_fit(f, self.Mw, y) try: a, b = popt except Exception: a, = popt b = -0.5 print(f'a: {a:.1f} b {b:.1f}:') slope = (3 / 2) / b print(f'slope: {slope:.1f}') r2 = calc_r2(self.Mw, y, np.ones_like(y), a, b) print('r2:', r2) # print('r2_err:', r2_err) delta_sigma = 1. / ((vel * 1000.)**3.) * 10**(a + 9.1 + 0.935) delta_sigma /= 1e6 print('delta_sigma', delta_sigma) fc_test = stress_drop_curve_fc_mw( delta_sigma, vel, mw_test, k_parameter, b=b) ax.plot( mw_test, fc_test, color='red', zorder=10, label=f'r2: {r2:.2f}') ax.legend() if not slope: ax.text( mw_test[-1], fc_test[-1], f'{delta_sigma:.1f} MPa', color='red', backgroundcolor='white', zorder=50)
[docs] def plot_fc_mw(self, hist=False, fit=False, slope=False, nbins=None, wave_type='S', colorby=None, colormap=None): """ Plot the logarithm of the corner frequency vs the moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of fc vs mw. slope : bool If True, also fit the slope of the linear regression. nbins : int Number of bins for the 2D histogram. wave_type : str One of "P", "S", "SV" or "SH". Default is "S". colorby : str Color the data points by the given parameter. colormap : str Use this colormap for the colorby parameter instead of the default. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') if wave_type == 'P': vel = np.nanmean(self.vp) k = np.nanmean(self.kp) elif wave_type in ('S', 'SV', 'SH'): vel = np.nanmean(self.vs) k = np.nanmean(self.ks) else: raise ValueError('Wave type must be "P", "S", "SV" or "SH"') self._stress_drop_curves_fc_mw(vel, k, ax) if hist: npoints = self._2d_hist_fc_mw(fig, ax, nbins, wave_type) else: npoints = self._scatter_fc_mw( fig, ax, wave_type, colorby, colormap) if fit: self._fit_fc_mw(vel, k, ax, slope=slope) extra_text = 'Stress drop curves' self._set_plot_title(ax, npoints, extra_text) self._add_grid(ax_Mo) ax_Mo.set_ylabel(get_param_label('fc')) plt.show()
[docs] def plot_Er_mw(self, hist=False, fit=False, nbins=None, colorby=None, colormap=None): """ Plot the logarithm of the radiated energy vs the moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of Er vs mw. nbins : int Number of bins for the 2D histogram. colorby : str Color the data points by the given parameter. colormap : str Use this colormap for the colorby parameter instead of the default. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') mu = np.nanmean((self.vs * 1e3)**2 * self.rho) self._apparent_stress_curves_Er_mw(mu, ax) if hist: npoints = self._2d_hist_Er_mw(fig, ax, nbins) else: npoints = self._scatter_Er_mw(fig, ax, colorby, colormap) if fit: raise NotImplementedError('Fit not implemented yet for Er_mw') extra_text = 'Apparent stress curves' self._set_plot_title(ax, npoints, extra_text) self._add_grid(ax_Mo) ax_Mo.set_ylabel(get_param_label('Er')) plt.show()
[docs] def plot_ssd_mw(self, hist=False, fit=False, nbins=None, colorby=None, colormap=None): """ Plot the logarithm of static stress drop vs moment magnitude. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of ssd vs mw. nbins : int Number of bins for the 2D histogram. colorby : str Color the data points by the given parameter. colormap : str Use this colormap for the colorby parameter instead of the default. """ fig, ax, ax_Mo = self._make_mw_axis() ax_Mo.set_yscale('log') ssd_min = np.min(self.ssd - self.ssd_err_minus) ssd_max = np.max(self.ssd + self.ssd_err_plus) ssd_min = 10**(np.floor(np.log10(ssd_min))) ssd_max = 10**(np.ceil(np.log10(ssd_max))) ax.set_ylim(ssd_min, ssd_max) if hist: self._2d_hist_ssd_mw(fig, ax, nbins) else: self._scatter_ssd_mw(fig, ax, colorby, colormap) if fit: raise NotImplementedError('Fit not implemented yet for ssd_mw') self._set_plot_title(ax) self._add_grid(ax_Mo) ax_Mo.set_ylabel(get_param_label('ssd')) plt.show()
[docs] def plot_ssd_depth(self, hist=False, fit=False, nbins=None, colorby=None, colormap=None): """ Plot the logarithm of static stress drop vs depth. Parameters ---------- hist : bool If True, plot a 2D histogram instead of a scatter plot. fit : bool If True, plot a linear regression of ssd vs depth. nbins : int Number of bins for the 2D histogram. colorby : str Color the data points by the given parameter. colormap : str Use this colormap for the colorby parameter instead of the default. """ fig, ax = plt.subplots(figsize=(10, 6)) depth_min = np.min(self.depth) depth_max = np.max(self.depth) padding = 0.15 * (depth_max - depth_min) ax.set_xlim(depth_min-padding, depth_max+padding) ax.set_xlabel(get_param_label('depth')) ax.set_ylabel(get_param_label('ssd')) ax.set_yscale('log') ax.set_ylim(1e-3, 1e3) if hist: self._2d_hist_ssd_depth(fig, ax, nbins) else: self._scatter_ssd_depth(fig, ax, colorby, colormap) if fit: raise NotImplementedError( 'Fit not implemented yet for ssd_depth') self._set_plot_title(ax) self._add_grid(ax) plt.show()
[docs] def plot_mw_time(self, colorby=None, colormap=None): """ Plot the moment magnitude vs time. Parameters ---------- colorby : str Color the data points by the given parameter. colormap : str Use this colormap for the colorby parameter instead of the default. """ fig, ax = plt.subplots(figsize=(10, 6)) ax.set_zorder(10) ax.patch.set_visible(False) ax.set_xlabel('Time') ax.xaxis.set_major_formatter( plt.matplotlib.dates.DateFormatter('%Y-%m-%d') ) ax.set_ylabel(get_param_label('Mw')) mpl_time = np.array([t.matplotlib_date for t in self.time]) padding_x = 0.05 * (np.max(mpl_time) - np.min(mpl_time)) padding_y = 0.05 * ( np.max(self.Mw + self.Mw_err_plus) - np.min(self.Mw - self.Mw_err_minus) ) ax.set_xlim(np.min(mpl_time) - padding_x, np.max(mpl_time) + padding_x) ax.set_ylim( np.min(self.Mw - self.Mw_err_minus) - padding_y, np.max(self.Mw + self.Mw_err_plus) + padding_y ) cond = ~np.isnan(self.Mw) npoints = self._scatter_plot( fig, ax, mpl_time, self.Mw, np.zeros_like(self.time), [self.Mw_err_minus, self.Mw_err_plus], self.evids, 'Mw {:.2f}', cond, colorby, colormap, cbaxes_location='right', xformat='{}') # Add cumulative moment axis ax2 = ax.twinx() ax2.set_zorder(0) ax2.set_ylabel('Cumulative Moment (N·m)') isort = np.argsort(mpl_time) mpl_time = mpl_time[isort] Mo = self.Mo[isort] cumulative_moment = np.cumsum(Mo) ax2.plot( mpl_time, cumulative_moment, color='red', alpha=0.6, lw=2) ax2.set_ylim(0, np.max(cumulative_moment) * 1.1) final_moment = cumulative_moment[-1] cum_mag = moment_to_mag(final_moment) cum_mag_text = f'Cumulative\nMw: {cum_mag:.2f}' bbox = { 'facecolor': 'white', 'edgecolor': 'black', 'boxstyle': 'round', 'alpha': 0.9 } # we add the text to the first axis, so that it is on top, # but we need to transform the coordinates to the second axis ax.text( mpl_time[-1], cumulative_moment[-1], cum_mag_text, color='red', transform=ax2.transData, verticalalignment='bottom', horizontalalignment='left', bbox=bbox, zorder=50) extra_text = 'Mw vs Time' self._set_plot_title(ax, npoints, extra_text, align='left') self._add_grid(ax) plt.show()
[docs] def plot_hist(self, param_name, nbins=None, wave_type='S'): """ Plot a histogram of the given parameter. Parameters ---------- param_name : str Name of the parameter to plot. nbins : int Number of bins for the histogram. wave_type : str One of "P", "S", "SV" or "SH". Default is "S". """ _, unit, log, color = valid_parameters[param_name] log = log == 'log' values = getattr(self, param_name) if param_name == 'fc': values = values[self.wave_type == wave_type] if len(values) == 0: raise ValueError( f'No events found for wave type "{wave_type}"') values = values[~np.isnan(values)] nvalues = len(values) if nvalues < 2: raise ValueError(f'Not enough values to plot histogram: {nvalues}') if nbins is None: # find the number of bins as the closest power of 10 to the # number of values divided by 10 nbins_exp = int(np.ceil(np.log10(nvalues / 10))) nbins = int(10**nbins_exp) # reduce the number of bins if there are too few values per bin if nvalues / nbins < 2: nbins //= 2 if log: values_mean = 10**(np.mean(np.log10(values))) min_exp = np.floor(np.min(np.log10(values))) max_exp = np.ceil(np.max(np.log10(values))) bins = np.logspace(min_exp, max_exp, nbins + 1) else: values_mean = np.mean(values) maxval = np.max(np.abs(values)) bins = np.linspace(0, maxval, nbins) _fig, ax = plt.subplots() ax.hist(values, bins=bins, color=color) ax.axvline(values_mean, color='red') txt = ( f' mean: {values_mean:.1e}' if log else f' mean: {values_mean:.4f}' ) if unit is not None: txt += f' {unit}' ax.text( values_mean, 0.95, txt, color='red', transform=ax.get_xaxis_transform()) if log: ax.set_xscale('log') ax.set_xlabel(get_param_label(param_name)) ax.set_ylabel('Counts') title = f'{nvalues} values, {nbins} bins - {self.stat}' if self.runid is not None: title += f' - runid {self.runid}' ax.set_title(title) plt.show()
[docs] def plot_gutenberg_richter( self, magmin=None, magmax=None, nbins=None, fit=False): """ Plot the Gutenberg-Richter law. Parameters ---------- magmin : float Minimum magnitude for the plot. magmax : float Maximum magnitude for the plot. nbins : int Number of bins for the histogram. fit : bool If True, find the magnitude of completeness and fit the Gutenberg-Richter law to the data. """ if magmin is None: magmin = np.floor(np.min(self.Mw - self.Mw_err_minus)) if magmax is None: magmax = np.ceil(np.max(self.Mw + self.Mw_err_plus)) if nbins is None: nbins = int((magmax - magmin) * 10) print(f'Number of bins autoset to: {nbins}') bins = np.linspace(magmin, magmax, nbins + 1) hist, bin_edges = np.histogram(self.Mw, bins=bins) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 cum_nevs = np.cumsum(hist[::-1])[::-1] if fit: mc = compute_mc(self.Mw, 0.1, bin_centers, 0.1) print(f'Magnitude of completeness: {mc:.2f}') a, b = fit_gutenberg_richter(bin_centers, cum_nevs, mc) print(f'G-R law parameters: a={a:.2f}, b={b:.2f}') mw_fit = np.linspace(mc, magmax, 100) gr_fit = 10**(a - b * mw_fit) _fig, ax1 = plt.subplots(figsize=(10, 6)) ax1.set_zorder(1) ax1.patch.set_visible(False) ax1.plot( bin_centers, cum_nevs, 'o', color='#FCBA25', markeredgecolor='black', markersize=8, zorder=10) if fit: outline = patheffects.withStroke(linewidth=0.5, foreground='black') bbox = { 'facecolor': 'white', 'edgecolor': 'black', 'boxstyle': 'round', 'alpha': 0.7 } # plot the G-R law ax1.plot(mw_fit, gr_fit, color='#FCBA25', linewidth=4, zorder=2) xmin, xmax = ax1.get_xlim() # add a label with the G-R law parameters gr_text_x = 0.805 gr_text_x_data = xmin + gr_text_x * (xmax - xmin) gr_text_y_data = 10**(a - b * gr_text_x_data)*1.2 gr_text = f'G-R params:\na={a:.2f}, b={b:.2f}' ax1.text( gr_text_x, gr_text_y_data, gr_text, color='#FCBA25', transform=ax1.get_yaxis_transform(), horizontalalignment='left', verticalalignment='bottom', path_effects=[outline,], bbox=bbox ) # add a vertical line at the magnitude of completeness ax1.axvline(mc, color='red', linestyle='--', zorder=2) # add a label with the magnitude of completeness mc_text_x_data = mc + 0.02 * (xmax - xmin) mc_text = f'$M_C$={mc:.2f}' ax1.text( mc_text_x_data, 0.97, mc_text, color='red', transform=ax1.get_xaxis_transform(), horizontalalignment='left', verticalalignment='top', path_effects=[outline,], bbox=bbox ) ax1.set_yscale('log') ax1.set_xlabel('Magnitude (Mw)') ax1.set_ylabel('Cumulative Number of Events', color='#FCBA25') ax1.tick_params(axis='y', labelcolor='#FCBA25') ax2 = ax1.twinx() ax2.set_zorder(0) ax2.hist(self.Mw, bins=bins, color='#1E90FF', alpha=0.6) ax2.set_ylabel('Number of Events', color='#1E90FF') ax2.tick_params(axis='y', labelcolor='#1E90FF') ax2.set_ylim(0, max(hist) * 1.5) extra_text = f'Gutenberg-Richter Law\n{nbins} bins' self._set_plot_title(ax1, extra_text=extra_text, align='right') self._add_grid(ax1) plt.show()
[docs] def run(): """Run the script.""" args = parse_args() params = Params(args) if os.path.exists('problems.txt'): _skip_events(params) params.filter( stamin=args.stamin, stamax=args.stamax, magmin=args.magmin, magmax=args.magmax, ssdmin=args.ssdmin, ssdmax=args.ssdmax, sigmaamin=args.sigmaamin, sigmaamax=args.sigmaamax, ) if len(params.evids) == 0: raise ValueError('No events found') if args.plot_type == 'fc_mw': params.plot_fc_mw( args.hist, args.fit, args.slope, args.nbins, args.wave_type, args.colorby, args.colormap) elif args.plot_type == 'Er_mw': params.plot_Er_mw( args.hist, args.fit, args.nbins, args.colorby, args.colormap) elif args.plot_type == 'ssd_mw': params.plot_ssd_mw( args.hist, args.fit, args.nbins, args.colorby, args.colormap) elif args.plot_type == 'ssd_depth': params.plot_ssd_depth( args.hist, args.fit, args.nbins, args.colorby, args.colormap) elif args.plot_type == 'mw_time': params.plot_mw_time(args.colorby, args.colormap) elif args.plot_type == 'GR': params.plot_gutenberg_richter( args.magmin, args.magmax, args.nbins, args.fit) elif args.plot_type in valid_plot_types: params.plot_hist(args.plot_type, args.nbins, args.wave_type)
def _skip_events(params): evid_skip = np.loadtxt('problems.txt', usecols=(0,), dtype=str) # case in which there is only one evid: if not evid_skip.shape: evid_skip = evid_skip[None] skip_idx = [] for evid in evid_skip: with contextlib.suppress(Exception): skip_idx.append(np.where(params.evids == evid)[0][0]) skipped_evid_str = ' '.join(params.evids[skip_idx]) print(f'Skipping events: {skipped_evid_str}') params.skip_events(skip_idx)
[docs] def main(): """Main function.""" try: run() except KeyboardInterrupt: sys.exit(0) except Exception as msg: sys.exit(msg)
if __name__ == '__main__': main()