|
|
|
@ -285,12 +285,22 @@ class SaveMSeedThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
self.processed_files = queue.Queue()
|
|
|
|
|
|
|
|
|
|
self.out_files = {}
|
|
|
|
|
|
|
|
|
|
def set_checkpt_file(self, path):
|
|
|
|
|
self.checkpt_file = path
|
|
|
|
|
|
|
|
|
|
def get_tmax(self):
|
|
|
|
|
return self.tmax
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def bytes_written(self):
|
|
|
|
|
return sum(s for s in self.out_files.values())
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def nfiles_written(self):
|
|
|
|
|
return len(self.out_files)
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
logger.info('Starting MiniSeed saving thread')
|
|
|
|
|
while True:
|
|
|
|
@ -301,7 +311,7 @@ class SaveMSeedThread(threading.Thread):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
t_start = time()
|
|
|
|
|
io.save(
|
|
|
|
|
out_files = io.save(
|
|
|
|
|
traces, self.outpath,
|
|
|
|
|
format='mseed',
|
|
|
|
|
record_length=self.record_length,
|
|
|
|
@ -310,6 +320,10 @@ class SaveMSeedThread(threading.Thread):
|
|
|
|
|
if self.checkpt_file is not None:
|
|
|
|
|
with open(self.checkpt_file, 'w') as f:
|
|
|
|
|
f.write(str(tmax))
|
|
|
|
|
|
|
|
|
|
for fn in out_files:
|
|
|
|
|
self.out_files[fn] = op.getsize(fn)
|
|
|
|
|
|
|
|
|
|
self.tmax = tmax
|
|
|
|
|
|
|
|
|
|
self.processed_files.put(fns)
|
|
|
|
@ -442,6 +456,10 @@ class iDASConvert(object):
|
|
|
|
|
def bytes_input_rate(self):
|
|
|
|
|
return sum(t.bytes_input_rate for t in self.load_threads)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def bytes_written(self):
|
|
|
|
|
return self.save_thread.bytes_written
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def processing_rate(self):
|
|
|
|
|
return self.processing_thread.processing_rate
|
|
|
|
@ -501,7 +519,7 @@ class iDASConvert(object):
|
|
|
|
|
logger.debug('Joined processing queue')
|
|
|
|
|
|
|
|
|
|
# Ensure it is the last element
|
|
|
|
|
self.save_queue.put((time(), False))
|
|
|
|
|
self.save_queue.put((time(), False, None))
|
|
|
|
|
self.save_queue.join()
|
|
|
|
|
logger.debug('Joining save trace queue')
|
|
|
|
|
|
|
|
|
@ -525,11 +543,12 @@ class iDASConvert(object):
|
|
|
|
|
def get_status(self):
|
|
|
|
|
s = self
|
|
|
|
|
return (
|
|
|
|
|
f'Processed {s.nfiles_processed}/{s.nfiles} files'
|
|
|
|
|
f' ({sizeof_fmt(s.bytes_loaded)}/{sizeof_fmt(s.bytes_total)}'
|
|
|
|
|
f'Processed {100*s.nfiles_processed/s.nfiles:.1f}%.'
|
|
|
|
|
f' {sizeof_fmt(s.bytes_loaded)}/{sizeof_fmt(s.bytes_total)}'
|
|
|
|
|
f' @ {s.bytes_processing_rate/1e6:.1f} MB/s,'
|
|
|
|
|
f' In {s.bytes_input_rate/1e6:.1f} MB/s,'
|
|
|
|
|
f' Proc {s.processing_rate/1e6:.1f} MB/s).'
|
|
|
|
|
f' in {s.bytes_input_rate/1e6:.1f} MB/s,'
|
|
|
|
|
f' proc {s.processing_rate/1e6:.1f} MB/s.'
|
|
|
|
|
f' {sizeof_fmt(s.bytes_written)} written.'
|
|
|
|
|
f' Head is at {tts(s.time_head)}.'
|
|
|
|
|
f' Queues'
|
|
|
|
|
f' L:{s.load_queue_size}'
|
|
|
|
|