A probabilistic earthquake source inversion framework. Designed and crafted in Mordor. https://pyrocko.org/grond/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

426 lines
14 KiB

import logging
import os
import shutil
import os.path as op
from pyrocko import gf, scenario, util, guts
from pyrocko.gui import marker as pmarker
import math
import grond
DEFAULT_STATIC_STORE = 'ak135_static'
DEFAULT_WAVEFORM_STORE = 'global2s'
logger = logging.getLogger('grond.scenario')
km = 1e3
class GrondScenario(object):
def __init__(self, project_dir,
center_lat=23., center_lon=52., radius=230*km,
problem=None, observations=[]):
self.project_dir = project_dir
self.data_dir = op.join('data', 'scenario')
self.center_lat = center_lat
self.center_lon = center_lon
self.radius = radius
self.problem = problem
self.observations = observations
self.rebuild = False
self._scenario = None
def get_gf_stores_dir(self):
return op.join(self.project_dir, 'gf_stores')
def create_project_dir(self, force=False):
prj_dir = self.project_dir
if op.exists(prj_dir) and not force:
raise grond.GrondError(
'Directory "%s" already exists! Use --force to overwrite'
% prj_dir)
elif op.exists(prj_dir) and force:
logger.info('Overwriting directory %s.', prj_dir)
shutil.rmtree(prj_dir)
util.ensuredir(prj_dir)
@property
def stores_wanted(self):
target_generators = self.get_scenario().target_generators
return set([t.store_id for t in target_generators])
def symlink_gfstores(self, engine):
logger.info('Symlinking Green\'s function stores...')
for store_id in self.stores_wanted:
store = engine.get_store(store_id)
dtarget = op.join(self.get_gf_stores_dir(), store_id)
if not op.exists(dtarget):
os.symlink(store.store_dir, dtarget)
def add_observation(self, observation):
logger.info('Adding %s.' % observation.__class__.__name__)
self.observations.append(observation)
def set_problem(self, problem):
self.problem = problem
def get_dataset_config(self):
events_path = op.join(self.data_dir, 'events.txt')
dataset_config = grond.DatasetConfig(
events_path=events_path)
for obs in self.observations:
obs.update_dataset_config(dataset_config, self.data_dir)
return dataset_config
def set_config(self, filename):
self.config_fn = filename
def get_scenario(self):
if not self._scenario:
if self.rebuild:
scenario_file = op.join(self.project_dir, 'data', 'scenario',
'scenario.yml')
self._scenario = guts.load(filename=scenario_file)
else:
if not self.observations:
raise AttributeError('No observations set,'
' use .add_observation(Observation)')
if not self.problem:
raise AttributeError('No Source Problem set,'
' use .set_problem(Problem).')
self._scenario = scenario.ScenarioGenerator(
center_lat=self.center_lat,
center_lon=self.center_lon,
radius=self.radius,
target_generators=[obs.get_scenario_target_generator()
for obs in self.observations],
source_generator= self.problem.get_scenario_source_generator()) # noqa
return self._scenario
def create_scenario(
self,
force=False,
interactive=True,
gf_store_superdirs=None,
make_map=True):
logger.info('Creating scenario...')
scenario = self.get_scenario()
self.create_project_dir(force)
util.ensuredir(self.get_gf_stores_dir())
data_dir = op.join(self.project_dir, self.data_dir)
util.ensuredir(data_dir)
scenario.dump(filename=op.join(data_dir, 'scenario.yml'))
if gf_store_superdirs is None:
engine1 = gf.LocalEngine(
use_config=True,
store_superdirs=[self.get_gf_stores_dir()])
else:
engine1 = gf.LocalEngine(
use_config=False,
store_superdirs=gf_store_superdirs)
scenario.init_modelling(engine=engine1)
scenario.ensure_gfstores(
interactive=interactive)
self.symlink_gfstores(engine1)
engine2 = gf.LocalEngine(
use_config=False,
store_superdirs=[self.get_gf_stores_dir()])
scenario.init_modelling(engine=engine2)
scenario.dump_data(path=data_dir)
if make_map:
scenario.make_map(op.join(self.project_dir, 'scenario_map.pdf'))
shutil.move(op.join(data_dir, 'sources.yml'),
op.join(data_dir, 'scenario_sources.yml'))
markers = scenario.get_onsets()
marker_path = op.join(data_dir, 'picks', 'picks.markers')
if markers:
util.ensuredirs(marker_path)
pmarker.save_markers(markers, marker_path)
def get_grond_config(self):
engine_config = grond.EngineConfig(
gf_stores_from_pyrocko_config=False,
gf_store_superdirs=['gf_stores'])
optimiser_config = grond.HighScoreOptimiserConfig()
config = grond.Config(
rundir_template=op.join('runs', '${problem_name}.grun'),
dataset_config=self.get_dataset_config(),
target_groups=[obs.get_grond_target_group()
for obs in self.observations],
problem_config=self.problem.get_grond_problem_config(),
optimiser_config=optimiser_config,
engine_config=engine_config)
config.set_basepath(self.project_dir)
return config
def get_grond_config_path(self):
return op.join('config', 'scenario.gronf')
def create_grond_files(self):
logger.info('Creating Grond configuration for %s.'
% ' and '.join([obs.name for obs in self.observations]))
config_path = op.join(self.project_dir, self.get_grond_config_path())
util.ensuredirs(config_path)
grond.write_config(self.get_grond_config(), config_path)
def build(
self,
force=False,
interactive=False,
gf_store_superdirs=None,
make_map=True):
logger.info('Building scenario...')
self.create_project_dir(force)
self.create_scenario(
force=force,
interactive=interactive,
gf_store_superdirs=gf_store_superdirs,
make_map=make_map)
self.create_grond_files()
class Observation(object):
name = 'Observation Prototype'
def __init__(self, store_id, *args, **kwargs):
self.store_id = store_id
def update_dataset_config(self, dataset_config, data_dir):
return dataset_config
def get_scenario_target_generator(self):
pass
def get_grond_target_group(self):
pass
class WaveformObservation(Observation):
name = 'seismic waveforms'
def __init__(self, store_id=DEFAULT_WAVEFORM_STORE, nstations=25,
stationxml_paths=None, stations_paths=None):
self.nstations = nstations
self.store_id = store_id
self.stationxml_paths = stationxml_paths
self.stations_paths = stations_paths
def update_dataset_config(self, dataset_config, data_dir):
ds = dataset_config
ds.waveform_paths = [op.join(data_dir, 'waveforms')]
ds.stations_path = op.join(data_dir, 'meta', 'stations.txt')
ds.responses_stationxml_paths = [
op.join(data_dir, 'meta', 'stations.xml')]
return ds
def get_scenario_target_generator(self):
if self.stationxml_paths or self.stations_paths:
station_generator = scenario.station.ImportStationGenerator(
stations_paths=self.stations_paths,
stations_stationxml_paths=self.stationxml_paths)
else:
station_generator = scenario.station.RandomStationGenerator(
nstations=self.nstations)
return scenario.targets.WaveformGenerator(
station_generator=station_generator,
store_id=self.store_id,
tabulated_phases_from_store=True,
tabulated_phases_noise_scale=0.3,
seismogram_quantity='displacement')
def get_grond_target_group(self):
return grond.WaveformTargetGroup(
normalisation_family='time_domain',
path='all',
distance_min=10*km,
distance_max=1000*km,
channels=['Z', 'R', 'T'],
interpolation='multilinear',
store_id=self.store_id,
misfit_config=grond.WaveformMisfitConfig(
tmin='vel_surface:5.5',
tmax='vel_surface:3.0',
fmin=0.01,
fmax=0.1))
class InSARObservation(Observation):
name = 'InSAR displacement'
def __init__(self, store_id=DEFAULT_STATIC_STORE):
self.store_id = store_id
def update_dataset_config(self, dataset_config, data_dir):
dataset_config.kite_scene_paths = [op.join(data_dir, 'insar')]
return dataset_config
def get_scenario_target_generator(self):
return scenario.targets.InSARGenerator(
mask_water=False,
store_id=self.store_id)
def get_grond_target_group(self):
logger.info('''Hint:
Use kite's `spool` to configure the InSAR scene's quadtree and covariance!
''')
return grond.SatelliteTargetGroup(
normalisation_family='insar_target',
path='all',
interpolation='multilinear',
store_id=self.store_id,
kite_scenes=['*all'],
misfit_config=grond.SatelliteMisfitConfig(
optimise_orbital_ramp=True,
ranges={
'offset': '-0.5 .. 0.5',
'ramp_north': '-1e-4 .. 1e-4',
'ramp_east': '-1e-4 .. 1e-4'
})
)
class GNSSCampaignObservation(Observation):
name = 'GNSS campaign'
def __init__(self, store_id=DEFAULT_STATIC_STORE, nstations=25):
self.store_id = store_id
self.nstations = nstations
def update_dataset_config(self, dataset_config, data_dir):
dataset_config.gnss_campaign_paths = [op.join(data_dir, 'gnss')]
return dataset_config
def get_scenario_target_generator(self):
return scenario.targets.GNSSCampaignGenerator(
station_generator=scenario.targets.RandomStationGenerator(
nstations=self.nstations),
store_id=self.store_id)
def get_grond_target_group(self):
return grond.GNSSCampaignTargetGroup(
gnss_campaigns=['*all'],
normalisation_family='gnss_target',
path='all',
interpolation='multilinear',
store_id=self.store_id,
misfit_config=grond.GNSSCampaignMisfitConfig()
)
class SourceProblem(object):
def __init__(self, magnitude_min=6., magnitude_max=7., radius=10*km,
nevents=1):
self.magnitude_min = magnitude_min
self.magnitude_max = magnitude_max
self.radius = radius
self.nevents = nevents
def get_scenario_source_generator(self):
return
def get_grond_problem_config(self):
return
class DCSourceProblem(SourceProblem):
def get_scenario_source_generator(self):
return scenario.sources.DCSourceGenerator(
magnitude_min=self.magnitude_min,
magnitude_max=self.magnitude_max,
radius=self.radius,
depth_min=5*km,
depth_max=20*km,
nevents=self.nevents)
def get_grond_problem_config(self):
s2 = math.sqrt(2)
return grond.CMTProblemConfig(
name_template='cmt_${event_name}',
distance_min=2.*km,
mt_type='full',
ranges=dict(
time=gf.Range(-5.0, 5.0, relative='add'),
north_shift=gf.Range(-15*km, 15*km),
east_shift=gf.Range(-15*km, 15*km),
depth=gf.Range(5*km, 20*km),
magnitude=gf.Range(self.magnitude_min, self.magnitude_max),
rmnn=gf.Range(-1.0, 1.0),
rmee=gf.Range(-1.0, 1.0),
rmdd=gf.Range(-1.0, 1.0),
rmne=gf.Range(-s2, s2),
rmnd=gf.Range(-s2, s2),
rmed=gf.Range(-s2, s2),
duration=gf.Range(1.0, 15.0))
)
class RectangularSourceProblem(SourceProblem):
def get_scenario_source_generator(self):
return scenario.sources.RectangularSourceGenerator(
magnitude_min=self.magnitude_min,
magnitude_max=self.magnitude_max,
depth_min=5*km,
depth_max=15*km,
radius=self.radius,
nevents=self.nevents)
def get_grond_problem_config(self):
return grond.RectangularProblemConfig(
name_template='rect_source_${event_name}',
decimation_factor=1,
ranges=dict(
north_shift=gf.Range(-15*km, 15*km),
east_shift=gf.Range(-15*km, 15*km),
depth=gf.Range(5*km, 15*km),
length=gf.Range(20*km, 40*km),
width=gf.Range(5*km, 10*km),
dip=gf.Range(0, 90),
strike=gf.Range(0, 180),
rake=gf.Range(0, 90),
slip=gf.Range(.5, 10),
nucleation_x=gf.Range(-1., 1.),
nucleation_y=gf.Range(-1., 1.),
time=gf.Range(-5.0, 5.0, relative='add'))
)