Compare commits

...

3 Commits

Author SHA1 Message Date
Marius Kriegerowski c8af551cb9 fix 3 years ago
Marius Kriegerowski d13c4e038c add ncpu flag 3 years ago
Marius Kriegerowski 160c71cb5b test parallel preprocessing for template matching 3 years ago
  1. 87
      src/ifc.py

87
src/ifc.py

@ -3,12 +3,17 @@ import logging
import numpy as num
from collections import defaultdict
from scipy.signal import fftconvolve
from pyrocko.guts import Object, String, Float, Bool, StringChoice, List, Dict
from pyrocko.guts import Object, String, Float, Bool, StringChoice, List, Dict, Int
from pyrocko import trace, autopick, util, model
from pyrocko.gui import util as gui_util
from pyrocko import marker as pmarker
from lassie import shifter, common, geo
import multiprocessing
import time
from functools import partial
logger = logging.getLogger('lassie.ifc')
guts_prefix = 'lassie'
@ -319,6 +324,37 @@ class OnsetIFC(IFC):
return dataset
def pp(orig_b_a, downsample_rate, fmin, fmax, normalization,
sum_square, wmin, wmax, tmaster, tref, tpad_new):
orig_b, a = orig_b_a
b = orig_b.copy()
nslc = b.nslc_id
if downsample_rate is not None:
downsample(b, 1./downsample_rate)
b.highpass(4, fmin, demean=False)
b.lowpass(4, fmax, demean=False)
smin = round((wmin - tmaster) / b.deltat) * b.deltat
smax = round((wmax + tmaster) / b.deltat) * b.deltat
b.chop(smin, smax)
c = trace.correlate(
a, b, mode='valid', normalization=normalization)
c.shift(-c.tmin + b.tmin - (a.tmin - tref))
c.meta = {'tabu': True}
if sum_square:
c.ydata = c.ydata**2
c.chop(wmin - tpad_new, wmax + tpad_new)
return c
class TemplateMatchingIFC(IFC):
template_event_path = common.Path.T(
@ -340,6 +376,11 @@ class TemplateMatchingIFC(IFC):
optional=True,
help='If set, downsample to this sampling rate before processing [Hz]')
preprocessing_threads = Int.T(
optional=True,
default=1
)
def get_tpad(self):
tmin_masters = min(tr.tmin for tr in self.masters.values())
tmax_masters = max(tr.tmax for tr in self.masters.values())
@ -426,39 +467,33 @@ class TemplateMatchingIFC(IFC):
tref = tmin_masters
nsl_to_traces = defaultdict(list)
for orig_b in trs:
pool = multiprocessing.Pool(self.preprocessing_threads)
normalization = self.normalization
if normalization == 'off':
normalization = None
preprocessed = []
b = orig_b.copy()
prepare = []
t1 = time.time()
for orig_b in trs:
nslc = b.nslc_id
nslc = orig_b.nslc_id
a = self.masters.get(nslc, False)
if not a:
continue
prepare.append((orig_b, a))
if self.downsample_rate is not None:
downsample(b, 1./self.downsample_rate)
b.highpass(4, self.fmin, demean=False)
b.lowpass(4, self.fmax, demean=False)
smin = round((wmin - tmaster) / b.deltat) * b.deltat
smax = round((wmax + tmaster) / b.deltat) * b.deltat
b.chop(smin, smax)
normalization = self.normalization
if normalization == 'off':
normalization = None
c = trace.correlate(
a, b, mode='valid', normalization=normalization)
c.shift(-c.tmin + b.tmin - (a.tmin - tref))
c.meta = {'tabu': True}
if self.sum_square:
c.ydata = c.ydata**2
_pp = partial(pp, downsample_rate=self.downsample_rate,
fmin=self.fmin, fmax=self.fmax, normalization=normalization,
sum_square=self.sum_square, wmin=wmin, wmax=wmax, tmaster=tmaster,
tref=tref, tpad_new=tpad_new)
c.chop(wmin - tpad_new, wmax + tpad_new)
for c in pool.map(_pp, prepare):
nsl_to_traces[c.nslc_id[:3]].append(c)
nsl_to_traces[nslc[:3]].append(c)
print('this took %s to finish' % (time.time() - t1))
dataset = []
for nsl, cs in nsl_to_traces.iteritems():

Loading…
Cancel
Save