diff --git a/config.yaml b/config.yaml index 03eb7d26..36c77a1a 100644 --- a/config.yaml +++ b/config.yaml @@ -1,62 +1,74 @@ -# Where to plot and log +# Where to plot and log. directories: - # One directory for logs + # One directory in which to store all plot job logs (the STDOUT/ + # STDERR of all plot jobs). In order to monitor progress, plotman + # reads these logs on a regular basis, so using a fast drive is + # recommended. log: /home/chia/chia/logs - # One or more directories; the scheduler will use all of them + # One or more directories to use as tmp dirs for plotting. The + # scheduler will use all of them and distribute jobs among them. + # It assumes that IO is independent for each one (i.e., that each + # one is on a different physical device). + # + # If multiple directories share a common prefix, reports will + # abbreviate and show just the uniquely identifying suffix. tmp: - /mnt/tmp/00 - /mnt/tmp/01 - /mnt/tmp/02 - /mnt/tmp/03 - - /mnt/tmp/04 - - /mnt/tmp/05 - - /mnt/tmp/06 - - /mnt/tmp/07 - - /mnt/tmp/08 - - /mnt/tmp/09 - - /mnt/tmp/10 - - /mnt/tmp/11 - - # One directory (TODO: support distributing across multiple tmp2 dirs) - tmp2: /mnt/tmp/a - - # One or more directories; the scheduler will use all of them + + # Optional: tmp2 directory. If specified, will be passed to + # chia plots create as -2. Only one tmp2 directory is supported. + # tmp2: /mnt/tmp/a + + # One or more directories; the scheduler will use all of them. + # These again are presumed to be on independent physical devices, + # so writes (plot jobs) and reads (archivals) can be scheduled + # to minimize IO contention. dst: - - /home/chia/chia/plots/000 - - /home/chia/chia/plots/001 - - /home/chia/chia/plots/002 - - /home/chia/chia/plots/003 + - /mnt/dst/00 + - /mnt/dst/01 - # Archival + # Archival configuration. Optional. + # + # Currently archival depends on an rsync daemon running on the remote + # host, and that the module is configured to match the local path. + # See code for details. archive: rsyncd_module: plots rsyncd_path: /plots - rsyncd_bwlimit: 100000 # In KB/s - rsyncd_host: farmer + rsyncd_bwlimit: 80000 # Bandwidth limit in KB/s + rsyncd_host: myfarmer rsyncd_user: chia # Plotting scheduling parameters scheduling: - # Don't run a job on a particular temp dir more often than this. - # (obsolete) - # tmpdir_stagger_m: 300 + # Don't run a job on a particular temp dir until all existing jobs + # have progresed at least this far. Phase major corresponds to the + # plot phase, phase minor corresponds to the table or table pair + # in sequence. tmpdir_stagger_phase_major: 2 - tmpdir_stagger_phase_minor: 5 + tmpdir_stagger_phase_minor: 1 + + # Don't run more than this many jobs at a time on a single temp dir. tmpdir_max_jobs: 3 - # Global min; don't run any jobs more often than this. + # Don't run any jobs (across all temp dirs) more often than this. global_stagger_m: 30 # How often the daemon wakes to consider starting a new plot job polling_time_s: 20 -# Plotting parameters + +# Plotting parameters. These are pass-through parameters to chia plots create. +# See documentation at +# https://github.com/Chia-Network/chia-blockchain/wiki/CLI-Commands-Reference#create plotting: k: 32 + e: True # Use -e plotting option n_threads: 8 # Threads per job - # n_buckets: 64 # Number of buckets to split data into - # job_buffer: 9200 # Per job memory n_buckets: 128 # Number of buckets to split data into - job_buffer: 4580 # Per job memory + job_buffer: 4520 # Per job memory diff --git a/interactive.py b/interactive.py index 93467286..ac8105f1 100644 --- a/interactive.py +++ b/interactive.py @@ -11,15 +11,10 @@ import manager import reporting -def window_width(window): - return window.getmaxyx()[1] - -def window_height(window): - return window.getmaxyx()[0] - class Log: - entries = [] - cur_pos = 0 + def __init__(self): + self.entries = [] + self.cur_pos = 0 # TODO: store timestamp as actual timestamp indexing the messages def log(self, msg): @@ -81,7 +76,7 @@ def curses_main(stdscr): # Page layout. Currently requires at least ~40 rows. # TODO: make everything dynamically resize to best use available space - header_height = 2 + header_height = 3 jobs_height = 10 dirs_height = 14 logscreen_height = n_rows - (header_height + jobs_height + dirs_height) @@ -97,7 +92,7 @@ def curses_main(stdscr): refresh_period = int(sched_cfg['polling_time_s']) stdscr.nodelay(True) # make getch() non-blocking - stdscr.timeout(5000) # this doesn't seem to do anything.... + stdscr.timeout(2000) header_win = curses.newwin(header_height, n_cols, header_pos, 0) log_win = curses.newwin(logscreen_height, n_cols, logscreen_pos, 0) @@ -111,19 +106,28 @@ def curses_main(stdscr): while True: - # todo: none of this resizing works - (n_rows, n_cols) = map(int, stdscr.getmaxyx()) + # TODO: handle resizing. Need to (1) figure out how to reliably get + # the terminal size -- the recommended method doesn't seem to work: + # (n_rows, n_cols) = [int(v) for v in stdscr.getmaxyx()] + # Consider instead: + # ...[int(v) for v in os.popen('stty size', 'r').read().split()] + # and then (2) implement the logic to resize all the subwindows as above + stdscr.clear() linecap = n_cols - 1 logscreen_height = n_rows - (header_height + jobs_height + dirs_height) elapsed = (datetime.datetime.now() - last_refresh).total_seconds() - if (elapsed < refresh_period): - # Lightweight; does virtually no work if there are no new jobs. + + # A full refresh scans for and reads info for running jobs from + # scratch (i.e., reread their logfiles). Otherwise we'll only + # initialize new jobs, and mostly rely on cached info. + do_full_refresh = elapsed >= refresh_period + + if not do_full_refresh: jobs = Job.get_running_jobs_w_cache(dir_cfg['log'], jobs) else: - # Full refresh last_refresh = datetime.datetime.now() jobs = Job.get_running_jobs(dir_cfg['log']) # Look for running archive jobs. Be robust to finding more than one @@ -134,7 +138,7 @@ def curses_main(stdscr): (started, msg) = manager.maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg) if (started): log.log(msg) - plotting_status = '' + plotting_status = '' jobs = Job.get_running_jobs_w_cache(dir_cfg['log'], jobs) else: plotting_status = msg @@ -162,14 +166,11 @@ def curses_main(stdscr): dst_prefix = os.path.commonpath(dir_cfg['dst']) arch_prefix = dir_cfg['archive']['rsyncd_path'] - # Render - stdscr.clear() - # Header header_win.addnstr(0, 0, 'Plotman', linecap, curses.A_BOLD) - header_win.addnstr(' %s (refresh %ds/%ds)' % - (datetime.datetime.now().strftime("%H:%M:%S"), elapsed, refresh_period), - linecap) + timestamp = datetime.datetime.now().strftime("%H:%M:%S") + refresh_msg = "now" if do_full_refresh else f"{elapsed}s/{refresh_period}" + header_win.addnstr(f" {timestamp} (refresh {refresh_msg})", linecap) header_win.addnstr(' |

lotting: ', linecap, curses.A_BOLD) header_win.addnstr( plotting_status_msg(plotting_active, plotting_status), linecap) @@ -177,11 +178,15 @@ def curses_main(stdscr): header_win.addnstr( archiving_status_msg(archiving_active, archiving_status), linecap) + # Oneliner progress display + header_win.addnstr(1, 0, 'Jobs (%d): ' % len(jobs), linecap) + header_win.addnstr('[' + reporting.job_viz(jobs) + ']', linecap) + # These are useful for debugging. # header_win.addnstr(' term size: (%d, %d)' % (n_rows, n_cols), linecap) # Debuggin # if pressed_key: # header_win.addnstr(' (keypress %s)' % str(pressed_key), linecap) - header_win.addnstr(1, 0, 'Prefixes:', linecap, curses.A_BOLD) + header_win.addnstr(2, 0, 'Prefixes:', linecap, curses.A_BOLD) header_win.addnstr(' tmp=', linecap, curses.A_BOLD) header_win.addnstr(tmp_prefix, linecap) header_win.addnstr(' dst=', linecap, curses.A_BOLD) diff --git a/job.py b/job.py index 5abec804..495e0359 100755 --- a/job.py +++ b/job.py @@ -23,6 +23,15 @@ def job_phases_for_dstdir(d, all_jobs): '''Return phase 2-tuples for jobs outputting to dstdir d''' return sorted([j.progress() for j in all_jobs if j.dstdir == d]) +def is_plotting_cmdline(cmdline): + return ( + len(cmdline) >= 4 + and 'python' in cmdline[0] + and 'venv/bin/chia' in cmdline[1] + and 'plots' == cmdline[2] + and 'create' == cmdline[3] + ) + # TODO: be more principled and explicit about what we cache vs. what we look up # dynamically from the logfile class Job: @@ -65,15 +74,12 @@ def get_running_jobs_w_cache(logroot, existing_jobs): jobs = [] existing_jobs_by_pid = { j.proc.pid: j for j in existing_jobs } - for proc in psutil.process_iter(['pid', 'name']): - if os.path.basename(proc.name()) == 'chia': + for proc in psutil.process_iter(['pid', 'cmdline']): + if is_plotting_cmdline(proc.cmdline()): if proc.pid in existing_jobs_by_pid.keys(): jobs.append(existing_jobs_by_pid[proc.pid]) # Copy from cache else: - args = proc.cmdline() - # n.b.: args[0]=python, args[1]=chia - if len(args) >= 4 and args[2] == 'plots' and args[3] == 'create': - jobs.append(Job(proc, logroot)) + jobs.append(Job(proc, logroot)) return jobs diff --git a/reporting.py b/reporting.py index 677d1919..07ab2285 100644 --- a/reporting.py +++ b/reporting.py @@ -5,6 +5,7 @@ import archive import job import manager +import math import plot_util def abbr_path(path, putative_prefix): @@ -13,9 +14,49 @@ def abbr_path(path, putative_prefix): else: return path -def phases_str(phases): +def phases_str(phases, max_num=None): '''Take a list of phase-subphase pairs and return them as a compact string''' - return ', '.join(['%d:%d' % ph_subph for ph_subph in phases]) + if not max_num or len(phases) <= max_num: + return ' '.join(['%d:%d' % pair for pair in phases]) + else: + n_first = math.floor(max_num / 2) + n_last = max_num - n_first + n_elided = len(phases) - (n_first + n_last) + first = ' '.join(['%d:%d' % pair for pair in phases[:n_first]]) + elided = " [+%d] " % n_elided + last = ' '.join(['%d:%d' % pair for pair in phases[n_first + n_elided:]]) + return first + elided + last + +def n_at_ph(jobs, ph): + return sum([1 for j in jobs if j.progress() == ph]) + +def n_to_char(n): + n_to_char_map = dict(enumerate(" .:;!")) + + if n < 0: + return 'X' # Should never be negative + elif n >= len(n_to_char_map): + n = len(n_to_char_map) - 1 + + return n_to_char_map[n] + +def job_viz(jobs): + # TODO: Rewrite this in a way that ensures we count every job + # even if the reported phases don't line up with expectations. + result = '' + result += '1' + for i in range(0, 8): + result += n_to_char(n_at_ph(jobs, (1, i))) + result += '2' + for i in range(0, 8): + result += n_to_char(n_at_ph(jobs, (2, i))) + result += '3' + for i in range(0, 7): + result += n_to_char(n_at_ph(jobs, (3, i))) + result += '4' + result += n_to_char(n_at_ph(jobs, (4, 0))) + return result + def status_report(jobs, width, height=None, tmp_prefix='', dst_prefix=''): '''height, if provided, will limit the number of rows in the table, @@ -103,7 +144,7 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''): tab = tt.Texttable() dir2oldphase = manager.dstdirs_to_furthest_phase(jobs) dir2newphase = manager.dstdirs_to_youngest_phase(jobs) - headings = ['dst', 'plots', 'GB free', 'phases', 'priority'] + headings = ['dst', 'plots', 'GBfree', 'inbnd phases', 'pri'] tab.header(headings) tab.set_cols_dtype('t' * len(headings)) @@ -117,7 +158,8 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''): gb_free = int(plot_util.df_b(d) / plot_util.GB) n_plots = len(dir_plots) priority = archive.compute_priority(eldest_ph, gb_free, n_plots) - row = [abbr_path(d, prefix), n_plots, gb_free, phases_str(phases), priority] + row = [abbr_path(d, prefix), n_plots, gb_free, + phases_str(phases, 5), priority] tab.add_row(row) tab.set_max_width(width) tab.set_deco(tt.Texttable.BORDER | tt.Texttable.HEADER ) diff --git a/reporting_test.py b/reporting_test.py new file mode 100644 index 00000000..523e50f9 --- /dev/null +++ b/reporting_test.py @@ -0,0 +1,61 @@ +#!/usr/bin/python3 + +from unittest.mock import patch + +import os +import pyfakefs +import unittest + +import reporting + +class TestReporting(unittest.TestCase): + def test_phases_str(self): + self.assertEqual('1:2 2:3 3:4 4:0', + reporting.phases_str([(1,2), (2,3), (3,4), (4,0)])) + self.assertEqual('1:2 [+1] 3:4 4:0', + reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 3)) + self.assertEqual('1:2 [+2] 4:0', + reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 2)) + + def test_job_viz_empty(self): + self.assertEqual('1 2 3 4 ', + reporting.job_viz([]) ) + + @patch('job.Job') + def job_w_phase(self, ph, MockJob): + j = MockJob() + j.progress.return_value = ph + return j + + def test_job_viz_positions(self): + jobs = [self.job_w_phase((1, 1)), + self.job_w_phase((2, 0)), + self.job_w_phase((2, 4)), + self.job_w_phase((2, 7)), + self.job_w_phase((4, 0))] + + self.assertEqual('1 . 2. . .3 4.', + reporting.job_viz(jobs)) + + def test_job_viz_counts(self): + jobs = [self.job_w_phase((2, 2)), + self.job_w_phase((2, 3)), + self.job_w_phase((2, 3)), + self.job_w_phase((2, 4)), + self.job_w_phase((2, 4)), + self.job_w_phase((2, 4)), + self.job_w_phase((2, 5)), + self.job_w_phase((2, 5)), + self.job_w_phase((2, 5)), + self.job_w_phase((2, 5)), + self.job_w_phase((3, 1)), + self.job_w_phase((3, 1)), + self.job_w_phase((3, 1)), + self.job_w_phase((3, 1)), + self.job_w_phase((3, 1)), + self.job_w_phase((3, 1)), + ] + + self.assertEqual('1 2 .:;! 3 ! 4 ', + reporting.job_viz(jobs)) +