Browse Source

refactor

develop
Marius Isken 8 months ago
committed by miili
parent
commit
4bfc73600b
  1. 69
      README.md
  2. 1
      requirements.txt
  3. 2
      setup.py
  4. 291
      src/__init__.py
  5. 41
      src/app.py
  6. 165
      src/gfz_tapes.py
  7. 410
      src/idas_convert.py
  8. 39
      src/meta.py
  9. 22
      src/plugin.py
  10. 103
      src/telegram_bot.py
  11. 10
      src/trace_stats.py
  12. 27
      src/utils.py

69
README.md

@ -0,0 +1,69 @@
# iDAS TDMS Converter
this suite converts and downsamples Silixa iDAS TDMS data files to MiniSeed.
## Installation
```sh
python3 setup.py install
```
## Usage
Dump a config file
```sh
idas_convert dump_config
```
### Example Config
```yaml
--- !idas.iDASConvertConfig
# Loading TDMS in parallel
batch_size: 1
# Threads used for downsampling the data
nthreads: 8
# Input paths
paths:
- /home/isken/src/idas-convert
# Out path, see pyrocko.io for details
outpath: '%(tmin_year)s%(tmin_month)s%(tmin_day)s/%(network)s.%(station)s_%(tmin_year)s%(tmin_month)s%(tmin_day)s.mseed'
# Overwrite mseed meta information
new_network_code: ID
new_channel_code: HSF
downsample_to: 200.0
# MiniSeed record length
record_length: 4096
plugins:
# A plugin handling the communication with the GFZ tage file system
- !idas_convert.gfz_tapes.GFZTapesConfig
enabled: false
bytes_stage: 1T
waterlevel: 0.6
wait_warning_interval: 600.0
release_files: true
path_tapes_mount: /projects/ether/
path_tapes_prefix: /archive_FO1/RAW/
# A Telegram bot to keep up-to-date with the process
- !idas_convert.telegram_bot.TelegramBotConfig
enabled: false
token: Telegram Token
chat_id: Telegram Chat ID, e.g. -456413218
status_interval: 3600.0
```
### Start Conversion
```sh
idas_convert my_config.yml
```
See `idas_convert -h` for more options.

1
requirements.txt

@ -1 +1,2 @@
pyrocko
telebot

2
setup.py

@ -15,7 +15,7 @@ setuptools.setup(
],
entry_points={
'console_scripts': [
'idas_convert = idas_convert:main',
'idas_convert = idas_convert.app:main',
]},
classifiers=[

291
src/__init__.py

@ -1,289 +1,2 @@
import time
import logging
import numpy as num
import os
import subprocess
from pyrocko import io, trace
from pyrocko.io.tdms_idas import detect as detect_tdms
from pyrocko.util import tts
from itertools import repeat
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('idas_convert')
op = os.path
PADDING = .5
NTHREADS = 8
BATCH_SIZE = 1
def detect_files(path):
if op.isfile(path):
return [path]
return [op.join(root, f)
for root, dirs, files in os.walk(path)
for f in files]
def ensure_staged_files(files, stage_bytes, waterlevel=None):
nbytes = 0
nbytes_unstaged = 0
if waterlevel is None:
waterlevel = stage_bytes / 2
unstaged_files = []
for fn in files:
stats = os.stat(fn)
size = stats.st_size
nbytes += size
if stats.st_blocks == 0:
nbytes_unstaged += size
unstaged_files.append(fn)
if nbytes >= stage_bytes:
break
if nbytes_unstaged > waterlevel:
logger.info('staging additional %d bytes', nbytes)
for fn in unstaged_files:
fn_tapes = fn.strip('/projects/ether/')
logger.debug('staging file %s', fn_tapes)
subprocess.call(
args=['stage', '-D', '/archive_FO1/RAW', fn_tapes],
check=True)
logger.info('staginged %d bytes', nbytes)
else:
logger.info('staging waterlevel ok')
def downsample_data(args):
trace, deltat, tmin_limit = args
trace.downsample_to(deltat)
if tmin_limit \
and tmin_limit > trace.tmin \
and tmin_limit < trace.tmax:
trace.chop(
tmin=tmin_limit,
tmax=trace.tmax,
inplace=True)
trace.ydata = trace.ydata.astype(num.int32)
# tmean = (trace.tmax - trace.tmin) / 2
# mean = num.mean(trace.ydata)
# max = num.max(trace.ydata)
# std = num.std(trace.ydata)
return trace
# return trace, tmean, mean, max, std
def load_idas(fn):
logger.info('loading %s', op.basename(fn))
return io.load(fn, format='tdms_idas')
def convert_tdsm(
files, outpath,
downsample_to=1./200, network='ID', record_length=4096,
stage_ahead=None,
nthreads=NTHREADS, batch_size=BATCH_SIZE):
nfiles = len(files)
tmin_limit = None
trs_prev = []
ifn = 0
while files:
t0 = time.time()
trs_new = []
this_files = []
while files and len(this_files) < batch_size:
fn = files.pop(0)
ifn += 1
stat = os.stat(fn)
fn_wait = False
if stat.st_blocks == 0:
logger.warning('waiting for file %s', fn)
fn_wait = time.time()
while stat.st_blocks == 0:
stat = os.stat(fn)
time.sleep(1.)
logger.warning('file %s available. Waited %.2f s',
time.time() - fn_wait)
with open(fn, 'rb') as f:
if not detect_tdms(f.read(512)):
logger.warning('not a tdms file %s', fn)
continue
this_files.append(fn)
if stage_ahead:
ensure_staged_files(files, stage_ahead, waterlevel=stage_ahead/2)
with ThreadPoolExecutor(max_workers=len(this_files)) as executor:
trs_loaded = list(executor.map(load_idas, this_files))
tmins = []
for trs in trs_loaded:
if not trs:
logger.warning('loaded empty traces')
continue
tmins.append(trs[0].tmin)
trs_new.extend(trs)
trs_latest = trs_loaded[num.argmax(tmins)]
t_load = time.time() - t0
if not trs_new:
logger.error('empty input data')
continue
if trs_prev:
deltat = trs_new[0].deltat
prev_tmax = [tr.tmax for tr in trs_prev]
if max(tmins) - min(prev_tmax) > 2*deltat:
logger.warning('gap detected at %s', tts(min(prev_tmax)))
trs_prev = []
trs = sorted(trs_prev + trs_new, key=lambda tr: tr.full_id)
trs_prev = []
for tr in trs_latest:
try:
trs_prev.append(tr.chop(
tmin=tr.tmax - PADDING,
tmax=tr.tmax,
inplace=False))
except trace.NoData:
pass
t = time.time()
trs = trace.degapper(trs)
with ThreadPoolExecutor(max_workers=nthreads) as executor:
trs_ds = list(executor.map(
downsample_data,
zip(trs,
repeat(downsample_to),
repeat(tmin_limit))))
tmin_limit = max([tr.tmax + tr.deltat for tr in trs_ds])
t_ds = time.time() - t
for tr in trs_ds:
tr.set_network(network)
t = time.time()
io.save(
trs_ds, outpath,
format='mseed',
record_length=record_length,
append=True)
t_save = time.time() - t
elapsed = time.time() - t0
files_left = len(files)
logger.info(
'processed %d/%d files: in %.2f s '
'(FIR: %.2f, IO: %.2f/%.2f, remaining %.2f s)',
ifn, nfiles, elapsed, t_ds, t_load, t_save,
elapsed * (files_left / batch_size))
def main():
import argparse
def data_size(size):
mag = dict(k=3, M=6, G=9, T=12, P=15)
if size is None:
return None
size = size.strip()
v = int(size.strip(''.join(mag.keys())))
if not v:
return None
for suffix, m in mag.items():
if size.endswith(suffix):
return v*10**m
raise ValueError('cannot interpret size %s' % size)
parser = argparse.ArgumentParser(
'Convert and downsample iDAS TDMS to MiniSeed for archiving')
parser.add_argument(
'paths', type=str, nargs='+',
help='TDMS paths to convert.')
parser.add_argument(
'--downsample', type=int,
default=200,
help='Target sample rate for mseed. Default: %(default)s')
parser.add_argument(
'--network',
default='ID', type=lambda s: str(s)[:2],
help='Network code for MiniSeeds. Default: %(default)s')
parser.add_argument(
'--record_length', type=int,
default=4096,
help='MiniSeeds record length. Default: %(default)s')
parser.add_argument(
'--threads', type=int,
default=NTHREADS,
help='Number of threads for processing data. Default: %(default)s')
parser.add_argument(
'--batchsize', type=int,
default=BATCH_SIZE,
help='Number of parallel loaded TDMS files and processed at once.'
' Default: %(default)s')
parser.add_argument(
'--outpath', type=str,
default='%(tmin_year)s%(tmin_month)s%(tmin_day)s/%(network)s.%(station)s_%(tmin_year)s%(tmin_month)s%(tmin_day)s.mseed', # noqa
help='Outfile in pyrocko.io.save format. Default: %(default)s')
parser.add_argument(
'--stage-ahead', type=data_size, default=None,
help='Amount of data to stage before conversion in bytes. Suffix with'
' M, G, T, P. e.g. 4T.')
parser.add_argument(
'--verbose', '-v', action='count',
default=0,
help='Verbosity, add mutliple to increase verbosity.')
args = parser.parse_args()
log_level = logging.INFO - args.verbose * 10
log_level = log_level if log_level > logging.DEBUG else logging.DEBUG
logging.basicConfig(level=log_level)
paths = args.paths
if isinstance(paths, str):
paths = [paths]
logger.info('detecting files...')
files = []
for path in paths:
files.extend(detect_files(path))
logger.info('sorting %d files', len(files))
files = sorted(files, key=lambda f: op.basename(f))
convert_tdsm(
files, outpath=args.outpath, downsample_to=1./args.downsample,
network=args.network, record_length=args.record_length,
nthreads=args.threads, batch_size=args.batchsize)
if __name__ == '__main__':
main()
from . import gfz_tapes # noqa
from . import telegram_bot # noqa

41
src/app.py

@ -0,0 +1,41 @@
import argparse
import logging
import sys
import os.path as op
from pyrocko.guts import load
from .idas_convert import iDASConvertConfig
def main():
parser = argparse.ArgumentParser(
'Convert and downsample iDAS TDMS to MiniSeed for archiving')
parser.add_argument(
'config', type=str,
help='Config file or \'dump_config\'.')
parser.add_argument(
'--verbose', '-v', action='count',
default=0,
help='Verbosity, add mutliple to increase verbosity.')
args = parser.parse_args()
log_level = logging.INFO - args.verbose * 10
log_level = log_level if log_level > logging.DEBUG else logging.DEBUG
logging.getLogger().setLevel(log_level)
if args.config == 'dump_config':
print(iDASConvertConfig())
sys.exit(0)
config = load(filename=args.config)
converter = config.get_converter()
converter.start()
if __name__ == '__main__':
main()

165
src/gfz_tapes.py

@ -0,0 +1,165 @@
import os
import time
import logging
import subprocess
from pyrocko.guts import Bool, Float
from .plugin import Plugin, PluginConfig, register_plugin
from .meta import Path, DataSize
from .utils import sizeof_fmt
logger = logging.getLogger(__name__)
op = os.path
class GFZTapes(Plugin):
def __init__(self, bytes_stage, waterlevel,
path_tapes_mount, path_tapes_prefix,
release_files=True, wait_warning_interval=600.):
self.bytes_stage = bytes_stage
self.waterlevel = waterlevel
self.path_tapes_mount = path_tapes_mount
self.path_tapes_prefix = path_tapes_prefix
self.release_files = release_files
self.wait_warning_interval = wait_warning_interval
self.requested_files = set()
self._initial_stage = False
def set_parent(self, parent):
super().set_parent(parent)
parent.before_file_read.register(self.wait)
parent.before_batch_load.register(self.stage)
parent.finished_batch.register(self.release)
def _stage_files(self, files):
if isinstance(files, str):
files = (files, )
proc = subprocess.run(
['stage', '-D', self.path_tapes_prefix, *files])
# logger.debug('ran `%s`', ' '.join(proc.args))
try:
proc.check_returncode()
except subprocess.CalledProcessError as e:
logger.exception(e)
return proc
def _release_files(self, files):
if isinstance(files, str):
files = (files, )
proc = subprocess.run(
['release', '-D', self.path_tapes_prefix, *files])
# logger.debug('ran `%s`', proc.args)
try:
proc.check_returncode()
except subprocess.CalledProcessError as e:
logger.exception(e)
return proc
def wait(self, path):
stat = os.stat(path)
if stat.st_blocks > 0:
return
fn = op.basename(path)
logger.warning('Waiting for file %s', fn)
fn_wait = time.time()
fn_tape = op.relpath(op.abspath(path), self.path_tapes_mount)
logger.debug('Re-requesting file %s', fn_tape)
self._stage_files(fn_tape)
warning_interval = self.wait_warning_interval
while stat.st_blocks == 0:
stat = os.stat(path)
duration = time.time() - fn_wait
if duration > warning_interval:
logger.warning('Waiting since %.1f s', duration)
warning_interval += self.wait_warning_interval
time.sleep(1.)
logger.info('File available, waited %.2f s', duration)
def stage(self, remaining_files):
nbytes = 0
nbytes_unstaged = 0
unstaged_files = []
for fn in remaining_files:
stats = os.stat(fn)
nbytes += stats.st_size
if stats.st_blocks == 0:
nbytes_unstaged += stats.st_size
unstaged_files.append(op.abspath(fn))
if nbytes >= self.bytes_stage:
break
nbytes_staged = nbytes - nbytes_unstaged
if nbytes_staged / nbytes < self.waterlevel \
or not self._initial_stage:
fns_tape = set(op.relpath(fn, self.path_tapes_mount)
for fn in unstaged_files)
logger.debug('staging %d files', len(fns_tape))
self._stage_files(fns_tape)
self.requested_files |= fns_tape
logger.info('requested %s', sizeof_fmt(nbytes_unstaged))
self._initial_stage = True
else:
logger.info(
'staging waterlevel ok (%.1f%%, %s)',
(nbytes_staged / nbytes) * 100., sizeof_fmt(nbytes_staged))
def release(self, files):
if not self.release_files:
return
fns_tape = [op.relpath(fn, self.path_tapes_mount) for fn in files]
logger.debug('Releasing %d files', len(fns_tape))
self._release_files(fns_tape)
class GFZTapesConfig(PluginConfig):
bytes_stage = DataSize.T(
default='1T',
help='Amount of data to stage before conversion in bytes. Suffix with'
' M, G, T, P. e.g. 4T.')
waterlevel = Float.T(
default=.6,
help='Waterlevel before data is staged from tapes, in percent [0-1].')
wait_warning_interval = Float.T(
default=600.,
help='Log when waiting for files at this interval [s].')
release_files = Bool.T(
default=True,
help='Release files after reading.')
path_tapes_mount = Path.T(
default='/projects/ether/',
help='Where the archive is mounted')
path_tapes_prefix = Path.T(
default='/archive_FO1/RAW/',
help='Prefix for stage -D <prefix>.')
def get_plugin(self):
assert 0. < self.waterlevel < 1., 'bad waterlevel'
return GFZTapes(
self.bytes_stage, self.waterlevel,
self.path_tapes_mount, self.path_tapes_prefix,
self.release_files, self.wait_warning_interval)
register_plugin(GFZTapesConfig)

410
src/idas_convert.py

@ -0,0 +1,410 @@
import os
import re
import logging
import threading
from time import time
from glob import iglob
from datetime import timedelta, datetime
from itertools import repeat, chain
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import numpy as num
from pyrocko import io, trace
from pyrocko.io.tdms_idas import detect as detect_tdms
from pyrocko.util import tts
from pyrocko.guts import Object, String, Int, List, Timestamp
from .plugin import PluginConfig, PLUGINS_AVAILABLE
from .meta import Path
from .utils import Signal
guts_prefix = 'idas'
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('idas_convert')
nptdms_logger = logging.getLogger('nptdms.base_segment')
nptdms_logger.setLevel(logging.ERROR)
op = os.path
day = 3600. * 24
@dataclass
class Stats(object):
io_load_t: float = 0.
io_load_t_total: float = 0.
io_save_t: float = 0.
io_save_t_total: float = 0.
io_load_bytes: int = 0
io_load_bytes_total: int = 0
tprocessing: float = 0.
tprocessing_total: float = 0.
nfiles_total: int = 0
nfiles_processed: int = 0
time_processing: float = 0.
time_start: float = time()
def new_io_load(self, t, bytes):
self.io_load_t = t
self.io_load_bytes = bytes
self.io_load_bytes_total += bytes
self.io_load_t_total += t
def new_io_tsave(self, t):
self.io_save_t = t
self.io_save_t_total += t
def new_tprocessing(self, t):
self.tprocessing = t
self.tprocessing_total += t
def finished_batch(self, nfiles):
self.nfiles_processed += nfiles
@property
def nfiles_remaining(self):
return self.nfiles_total - self.nfiles_processed
@property
def time_remaining(self):
proc_time = time() - self.time_start
s = self.nfiles_remaining*(proc_time/(self.nfiles_processed or 1))
return timedelta(seconds=s)
@property
def io_load_speed(self):
return (self.io_load_bytes / 1e6) / \
(self.io_load_t or 1.)
@property
def io_load_speed_avg(self):
return (self.io_load_bytes_total / 1e6) / \
(self.io_load_t_total or 1.)
def split(trace, time):
if not trace.tmin < time < trace.tmax:
return (trace, )
return (trace.chop(trace.tmin, time, inplace=False),
trace.chop(time, trace.tmax, inplace=False, include_last=True))
def tdms_guess_time(path):
fn = op.splitext(op.basename(path))[0]
time_str = fn[-19:]
return datetime.strptime(time_str, '%Y%m%d_%H%M%S.%f').timestamp()
def detect_files(path):
if op.isfile(path):
return [path]
files = []
for ipath in iglob(path):
if op.isfile(ipath):
files.append(ipath)
elif op.isdir(ipath):
logger.debug('Scanning folder %s', ipath)
files.extend(
[op.join(root, f)
for root, dirs, files in os.walk(ipath)
for f in files])
return files
def process_data(args):
trace, deltat, tmin, tmax = args
tmin = tmin or trace.tmin
tmax = tmax or trace.tmax
if tmin > trace.tmax or tmax < trace.tmin:
return None
trace.downsample_to(deltat)
trace.chop(
tmin=tmin,
tmax=tmax,
inplace=True)
trace.ydata = trace.ydata.astype(num.int32)
return trace
def load_idas(fn):
logger.debug(
'Loading %s (thread: %s)', op.basename(fn), threading.get_ident())
return io.load(fn, format='tdms_idas')
class iDASConvert(object):
def __init__(
self, paths, outpath,
downsample_to=200., record_length=4096,
new_network_code='ID', new_channel_code='HSF',
channel_selection=None,
tmin=None, tmax=None,
nthreads=8, batch_size=1, plugins=[]):
logger.info('Detecting files...')
files = []
for path in paths:
files.extend(detect_files(path))
def in_timeframe(fn):
try:
fn_tmin = tdms_guess_time(fn)
except ValueError:
return True
if tmax is not None and fn_tmin > tmax:
return False
if tmin is not None and (fn_tmin + 60.) < tmin:
return False
return True
if tmin is not None or tmax is not None:
nfiles_before = len(files)
files = list(filter(in_timeframe, files))
logger.info('Filtered %d files', nfiles_before - len(files))
if not files:
raise OSError('No files selected for conversion.')
logger.info('Sorting %d files', len(files))
self.files = sorted(files, key=lambda f: op.basename(f))
self.files_all = self.files.copy()
self.nfiles = len(self.files_all)
self.stats = Stats(nfiles_total=self.nfiles)
self.outpath = outpath
self.channel_selection = None if not channel_selection \
else re.compile(channel_selection)
self.new_network_code = new_network_code
self.new_channel_code = new_channel_code
self.downsample_to = downsample_to
self.record_length = record_length
self.tmin = tmin
self.tmax = tmax
self.nthreads = nthreads
self.batch_size = batch_size
self.before_batch_load = Signal(self)
self.finished_batch = Signal(self)
self.before_file_read = Signal(self)
self.new_traces_converted = Signal(self)
self.plugins = plugins
for plugin in self.plugins:
logger.info('Activating plugin %s', plugin.__class__.__name__)
plugin.set_parent(self)
self._trs_prev = []
self._tmax_prev = None
@property
def nfiles_left(self):
return len(self.nfiles)
def start(self):
logger.info('Starting conversion of %d files', self.nfiles)
stats = self.stats
t_start = time()
files = self.files
trs_overlap = None
tmax_prev = None
while self.files:
load_files = []
self.before_batch_load.dispatch(self.files)
while files and len(load_files) < self.batch_size:
fn = self.files.pop(0)
self.before_file_read.dispatch(fn)
load_files.append(fn)
with open(fn, 'rb') as f:
if not detect_tdms(f.read(512)):
logger.warning('Not a tdms file %s', fn)
continue
batch_tmin = tmax_prev
if self.tmin is not None:
batch_tmin = max(self.tmin, batch_tmin or -1.)
traces, batch_tmin, batch_tmax, trs_overlap = self.convert_files(
load_files,
tmin=batch_tmin,
tmax=self.tmax,
overlap=trs_overlap)
self.finished_batch.dispatch(load_files)
stats.finished_batch(len(load_files))
tmax_prev = batch_tmax + 1./self.downsample_to
logger.info(
'Processed {s.nfiles_processed}/{s.nfiles_total} files'
' (DS: {s.tprocessing:.2f},'
' IO: {s.io_load_t:.2f}/{s.io_save_t:.2f}'
' [load: {s.io_load_speed:.2f} MB/s]).'
' {s.time_remaining} remaining'.format(s=stats))
logger.info('Finished. Processed %d files in %.2f s',
stats.nfiles_processed, time() - t_start)
def convert_files(self, files, tmin=None, tmax=None, overlap=False):
nfiles = len(files)
stats = self.stats
max_workers = nfiles if nfiles < self.batch_size else self.batch_size
t_start = time()
nbytes = sum(op.getsize(fn) for fn in files)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
trs_all = list(chain(*executor.map(load_idas, files)))
if self.channel_selection:
trs_all = [tr for tr in trs_all
if self.channel_selection.match(tr.station)]
if not trs_all:
raise TypeError('Did not load any traces!')
stats.new_io_load(time() - t_start, nbytes)
trs = sorted(trs_all + (overlap or []), key=lambda tr: tr.full_id)
trs_degapped = trace.degapper(trs)
if (overlap and tmin) and len(trs_degapped) != len(overlap):
logger.warning('Gap detected at %s', tts(tmin))
trs_degapped = trace.degapper(
sorted(trs_all, key=lambda tr: tr.full_id))
trs_overlap = self.get_traces_end(trs_degapped)
t_start = time()
with ThreadPoolExecutor(max_workers=self.nthreads) as executor:
trs_ds = list(executor.map(
process_data,
zip(trs_degapped,
repeat(1./self.downsample_to),
repeat(tmin), repeat(tmax))))
trs_ds = list(filter(lambda tr: tr is not None, trs_ds))
if not trs_ds:
return [], None, None, trs_overlap
for tr in trs_degapped:
tr.set_network(self.new_network_code)
tr.set_channel(self.new_channel_code)
batch_tmax = max(tr.tmax for tr in trs_ds)
batch_tmin = min(tr.tmin for tr in trs_ds)
stats.new_tprocessing(time() - t_start)
self.new_traces_converted.dispatch(trs_ds)
t_start = time()
# Split traces at day break
dt_min = datetime.fromtimestamp(batch_tmin)
dt_max = datetime.fromtimestamp(batch_tmax)
if dt_min.date() != dt_max.date():
dt_split = datetime.combine(dt_max.date(), datetime.min.time())
tsplit = dt_split.timestamp()
trs_ds = list(chain(*(split(tr, tsplit) for tr in trs_ds)))
io.save(
trs_ds, self.outpath,
format='mseed',
record_length=self.record_length,
append=True)
stats.new_io_tsave(time() - t_start)
return trs_ds, batch_tmin, batch_tmax, trs_overlap
def get_traces_end(self, traces, overlap=1.):
trs_chopped = []
for tr in traces:
trs_chopped.append(
tr.chop(tr.tmax - overlap, tr.tmax, inplace=False))
return trs_chopped
class iDASConvertConfig(Object):
batch_size = Int.T(
default=1,
help='Number of parallel loaded TDMS files and processed at once.')
nthreads = Int.T(
default=8,
help='Number of threads for processing data.')
paths = List.T(
Path.T(),
default=[os.getcwd()],
help='TDMS paths to convert.')
outpath = Path.T(
default='%(tmin_year)s%(tmin_month)s%(tmin_day)s/%(network)s.%(station)s_%(tmin_year)s%(tmin_month)s%(tmin_day)s.mseed', # noqa
help='Outfile in pyrocko.io.save format.')
channel_selection = String.T(
optional=True,
help='Limit the conversion so these channels, regex allowed')
new_network_code = String.T(
default='ID',
help='Network code for MiniSeeds.')
new_channel_code = String.T(
default='HSF')
tmin = Timestamp.T(
optional=True,
help='Start time for the conversion')
tmax = Timestamp.T(
optional=True,
help='End time for the conversion')
downsample_to = Int.T(
default=200.,
help='Target sample rate for mseed.')
record_length = Int.T(
default=4096,
help='MiniSeeds record length in bytes.')
plugins = List.T(
PluginConfig.T(),
default=[p() for p in PLUGINS_AVAILABLE],
help='Plugins for the converter')
def get_converter(self):
plugins = []
for plugin_config in self.plugins:
if plugin_config.enabled:
plugins.append(plugin_config.get_plugin())
return iDASConvert(
self.paths, self.outpath,
self.downsample_to, self.record_length,
self.new_network_code, self.new_channel_code,
self.channel_selection,
self.tmin, self.tmax,
self.nthreads, self.batch_size, plugins)

39
src/meta.py

@ -0,0 +1,39 @@
from pyrocko.guts import TBase, ValidationError, String
MAG = dict(k=3, M=6, G=9, T=12, P=15)
class Path(String):
...
class DataSize(String):
dummy_for = int
class __T(TBase):
def regularize_extra(self, val):
if isinstance(val, int):
return val
if isinstance(val, str):
size = val.strip()
try:
v = float(val.strip(''.join(MAG.keys())))
except ValueError:
raise ValidationError('cannot interpret %s', val)
if not v:
raise ValidationError('cannot interpret %s', val)
for suffix, m in MAG.items():
if size.endswith(suffix):
return int(v*10**m)
return int(v)
raise ValidationError('cannot interpret data size %s' % size)
def to_save(self, val):
if isinstance(val, int):
return val
return str(val)

22
src/plugin.py

@ -0,0 +1,22 @@
from pyrocko.guts import Object, Bool
PLUGINS_AVAILABLE = []
def register_plugin(plugin_config):
global PLUGINS_AVAILABLE
PLUGINS_AVAILABLE.append(plugin_config)
class Plugin(object):
def set_parent(self, parent):
self.parent = parent
class PluginConfig(Object):
enabled = Bool.T(
default=False)
def get_plugin(self):
return Plugin()

103
src/telegram_bot.py

@ -0,0 +1,103 @@
import logging
from time import time
from datetime import timedelta
from pyrocko.guts import String, Float
from .plugin import Plugin, PluginConfig, register_plugin
from .utils import sizeof_fmt
logger = logging.getLogger(__name__)
class TelegramBot(Plugin):
def __init__(self, token, chat_id,
status_interval=60.):
import telebot
self.token = token
self.chat_id = chat_id
self.status_interval = status_interval
self.started = time()
self._next_status = self.started + status_interval
self.bot = telebot.TeleBot(self.token)
def set_parent(self, parent):
super().set_parent(parent)
bot = self
class LogFilter(logging.Filter):
def filter(self, record):
if record.levelno < logging.WARNING:
return False
if not record.name.startswith(__name__.split('.')[0]):
return False
return True
class LogHandler(logging.StreamHandler):
terminator = ''
def emit(self, record):
bot.send_message('{level}: {msg}'.format(
level=record.levelname,
msg=record.getMessage()))
def flush(self):
...
handler = LogHandler()
handler.addFilter(LogFilter())
logging.getLogger().addHandler(handler)
parent.finished_batch.register(self.send_status)
self.send_message(
'Conversion of %d files started.' % self.parent.nfiles)
def send_message(self, message):
try:
return self.bot.send_message(self.chat_id, message)
except Exception as e:
logger.exception(e)
def send_status(self, *args):
if time() < self._next_status:
return
logger.debug('sending status message')
stats = self.parent.stats
self.send_message(
'Converted {s.nfiles_processed}/{s.nfiles_total} files'
' (Total {size_processed}), {s.time_remaining} remaining. '
'Average input {s.io_load_speed_avg:.1f} MB/s.'.format(
s=stats,
size_processed=sizeof_fmt(stats.io_load_bytes_total)))
self._next_status += self.status_interval
def __del__(self):
dt = timedelta(seconds=time() - self.started)
self.send_message('Conversion finished in %s.' % dt)
class TelegramBotConfig(PluginConfig):
token = String.T(
default='Telegram Token',
help='Telegram Bot API token')
chat_id = String.T(
default='Telegram Chat ID, e.g. -456413218',
help='Telegram Chat ID')
status_interval = Float.T(
default=3600.,
help='Report statistics at this interval [s].')
def get_plugin(self):
return TelegramBot(self.token, self.chat_id, self.status_interval)
register_plugin(TelegramBotConfig)

10
src/trace_stats.py

@ -0,0 +1,10 @@
import os
import time
import logging
import subprocess
from pyrocko.guts import Bool, Float
from .plugin import Plugin, PluginConfig, register_plugin
from .meta import Path, DataSize
logger = logging.getLogger(__name__)

27
src/utils.py

@ -0,0 +1,27 @@
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
class Signal(object):
def __init__(self, parent):
self.parent = parent
self.subscribers = set()
def register(self, callback):
assert callable(callback), '%s it not callable' % callback
self.subscribers.add(callback)
def unregister(self, callback):
try:
self.subscribers.remove(callback)
except KeyError:
pass
def dispatch(self, *args, **kwargs):
for callback in self.subscribers:
callback(*args, **kwargs)
Loading…
Cancel
Save