Browse Source

experimental: estimate tinc from expected memory usage

deluxe_memory
Marius Kriegerowski 4 years ago
parent
commit
cf36d37a28
  1. 4
      src/config.py
  2. 102
      src/core.py

4
src/config.py

@ -110,6 +110,10 @@ class Config(Object):
default='lassie_cache',
help='directory where lassie stores tabulated phases etc.')
use_memory = Float.T(
default=0.2,
help='specify the used memory. [0.0 - 1.0]')
def __init__(self, *args, **kwargs):
Object.__init__(self, *args, **kwargs)
self._receivers = None

102
src/core.py

@ -1,4 +1,5 @@
import os
import sys
import logging
import os.path as op
import shutil
@ -71,6 +72,55 @@ def zero_fill(trs, tmin, tmax):
return trs_out
def estimate_tinc(config, p, deltat_cf, show_window_traces, ifcs, tpad):
float_size = 8
int_size = 8
int32_size = 4
receivers = config.receivers
n_receivers = len(receivers)
ngridpoints = config.get_grid().size()
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
mem_want = mem_bytes * config.use_memory
# dynamic memory (dependent on tinc)
deltats = {}
for tr in p.iter_traces():
deltats.update(
{tr.nslc_id: min(deltats.get(tr.nslc_id, num.inf), tr.deltat)}
)
# chopped data
mem_per_second = 0
for codes, dt in deltats.items():
mem_per_second += 1./dt
# for plotting and debug copies:
if show_window_traces:
mem_per_second *= 3
# output array (frames):
mem_per_second += 1./deltat_cf * ngridpoints
# copied, downsampled versions (+2 extra for stackmax and indices, and clenghts in parstack
mem_per_second += (n_receivers + 3) * 1./deltat_cf
#mem_per_second *= float_size
mem_per_second *= 8
# static memory (independent of tinc)
offsets_mem = n_receivers * int_size
#shifts_mem = sys.getsizeof(shift_tables)
shifts_mem = len(ifcs) * n_receivers * ngridpoints * int32_size
weights_mem = ngridpoints * n_receivers * float_size
mem_static = offsets_mem + shifts_mem + weights_mem + (tpad * ngridpoints * float_size)
tinc = int((mem_want - mem_static) / mem_per_second)
return tinc
def scan(
config,
override_tmin=None,
@ -110,8 +160,8 @@ def scan(
util.ensuredirs(config.stackmax_path)
grid = config.get_grid()
receivers = config.get_receivers()
receivers = config.get_receivers()
norm_map = gridmod.geometrical_normalization(grid, receivers)
for data_path in config.data_paths:
@ -142,16 +192,6 @@ def scan(
tpad = max(ifc.get_tpad() for ifc in ifcs) + tshift_max + tpeaksearch
tinc = tshift_max * 10. + 3.0 * tpad
tavail = p.tmax - p.tmin
tinc = min(tinc, tavail - 2.0 * tpad)
if tinc <= 0:
raise common.LassieError(
'available waveforms too short \n'
'required: %g s\n'
'available: %g s\n' % (2.*tpad, tavail))
blacklist = set(tuple(s.split('.')) for s in config.blacklist)
whitelist = set(tuple(s.split('.')) for s in config.whitelist)
@ -165,18 +205,10 @@ def scan(
config.distance_max is None or
distances_to_grid[i] <= config.distance_max))
for data_path in config.data_paths:
if not op.exists(data_path):
raise common.LassieError(
'waveform data path does not exist: %s' % data_path)
p = pile.make_pile(config.data_paths, fileformat='detect')
if p.is_empty():
raise common.LassieError('no usable waveforms found')
check_data_consistency(p, receivers)
deltat_cf = max(p.deltats.keys())
max_deltat = max(p.deltats.keys())
deltat_cf = max_deltat
assert deltat_cf > 0.0
while True:
@ -185,10 +217,21 @@ def scan(
deltat_cf *= 2
logger.info('sampling interval: %g s' % deltat_cf)
ngridpoints = grid.size()
tavail = p.tmax - p.tmin
tinc = estimate_tinc(config, p, deltat_cf, show_window_traces, ifcs, tpad=tpad)
tinc = min(tinc, tavail - 2.0 * tpad)
if tinc <= 0:
raise common.LassieError(
'available waveforms too short \n'
'required: %g s\n'
'available: %g s\n' % (2.*tpad, tavail))
logger.info('sampling interval: %g s' % deltat_cf)
idetection = 0
station_weights = {}
@ -262,7 +305,8 @@ def scan(
nsls_selected, trs_selected = zip(*dataset)
trs_debug.extend(trs + list(trs_selected))
if show_window_traces:
trs_debug.extend(trs + list(trs_selected))
t0 = (wmin / deltat_cf) * deltat_cf
@ -314,6 +358,7 @@ def scan(
nparallel=nparallel,
impl='openmp')
shift_max = max(shift_maxs)
if config.sharpness_normalization:
@ -332,9 +377,8 @@ def scan(
deltat=deltat_cf,
ydata=frame_maxs)
trs_debug.append(tr_stackmax)
if show_window_traces:
trs_debug.append(tr_stackmax)
trace.snuffle(trs_debug)
ydata_window = tr_stackmax.chop(
@ -350,7 +394,9 @@ def scan(
wmin <= tpeak and tpeak < wmax]) or ([], [])
tr_stackmax_indx = tr_stackmax.copy(data=False)
imaxs = num.argmax(frames, axis=0)
imaxs = num.zeros(lengthout, dtype=num.int64)
num.argmax(frames.T, axis=1, out=imaxs)
tr_stackmax_indx.set_ydata(imaxs.astype(num.int32))
tr_stackmax_indx.set_location('i')
@ -362,7 +408,6 @@ def scan(
iframe = int(round(((tpeak-t0) - ioff*deltat_cf) / deltat_cf))
frame = frames[:, iframe]
imax = num.argmax(frame)
latpeak, lonpeak, xpeak, ypeak, zpeak = \
grid.index_to_location(imax)
@ -395,7 +440,6 @@ def scan(
if stop_after_first:
return
tr_stackmax.chop(wmin, wmax)
tr_stackmax_indx.chop(wmin, wmax)

Loading…
Cancel
Save