Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,62 +1,74 @@
# Where to plot and log
# Where to plot and log.
directories:
# One directory for logs
# One directory in which to store all plot job logs (the STDOUT/
# STDERR of all plot jobs). In order to monitor progress, plotman
# reads these logs on a regular basis, so using a fast drive is
# recommended.
log: /home/chia/chia/logs

# One or more directories; the scheduler will use all of them
# One or more directories to use as tmp dirs for plotting. The
# scheduler will use all of them and distribute jobs among them.
# It assumes that IO is independent for each one (i.e., that each
# one is on a different physical device).
#
# If multiple directories share a common prefix, reports will
# abbreviate and show just the uniquely identifying suffix.
tmp:
- /mnt/tmp/00
- /mnt/tmp/01
- /mnt/tmp/02
- /mnt/tmp/03
- /mnt/tmp/04
- /mnt/tmp/05
- /mnt/tmp/06
- /mnt/tmp/07
- /mnt/tmp/08
- /mnt/tmp/09
- /mnt/tmp/10
- /mnt/tmp/11

# One directory (TODO: support distributing across multiple tmp2 dirs)
tmp2: /mnt/tmp/a

# One or more directories; the scheduler will use all of them

# Optional: tmp2 directory. If specified, will be passed to
# chia plots create as -2. Only one tmp2 directory is supported.
# tmp2: /mnt/tmp/a

# One or more directories; the scheduler will use all of them.
# These again are presumed to be on independent physical devices,
# so writes (plot jobs) and reads (archivals) can be scheduled
# to minimize IO contention.
dst:
- /home/chia/chia/plots/000
- /home/chia/chia/plots/001
- /home/chia/chia/plots/002
- /home/chia/chia/plots/003
- /mnt/dst/00
- /mnt/dst/01

# Archival
# Archival configuration. Optional.
#
# Currently archival depends on an rsync daemon running on the remote
# host, and that the module is configured to match the local path.
# See code for details.
archive:
rsyncd_module: plots
rsyncd_path: /plots
rsyncd_bwlimit: 100000 # In KB/s
rsyncd_host: farmer
rsyncd_bwlimit: 80000 # Bandwidth limit in KB/s
rsyncd_host: myfarmer
rsyncd_user: chia


# Plotting scheduling parameters
scheduling:
# Don't run a job on a particular temp dir more often than this.
# (obsolete)
# tmpdir_stagger_m: 300
# Don't run a job on a particular temp dir until all existing jobs
# have progresed at least this far. Phase major corresponds to the
# plot phase, phase minor corresponds to the table or table pair
# in sequence.
tmpdir_stagger_phase_major: 2
tmpdir_stagger_phase_minor: 5
tmpdir_stagger_phase_minor: 1

# Don't run more than this many jobs at a time on a single temp dir.
tmpdir_max_jobs: 3

# Global min; don't run any jobs more often than this.
# Don't run any jobs (across all temp dirs) more often than this.
global_stagger_m: 30

# How often the daemon wakes to consider starting a new plot job
polling_time_s: 20

# Plotting parameters

# Plotting parameters. These are pass-through parameters to chia plots create.
# See documentation at
# https://github.com/Chia-Network/chia-blockchain/wiki/CLI-Commands-Reference#create
plotting:
k: 32
e: True # Use -e plotting option
n_threads: 8 # Threads per job
# n_buckets: 64 # Number of buckets to split data into
# job_buffer: 9200 # Per job memory
n_buckets: 128 # Number of buckets to split data into
job_buffer: 4580 # Per job memory
job_buffer: 4520 # Per job memory
51 changes: 28 additions & 23 deletions interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import manager
import reporting

def window_width(window):
return window.getmaxyx()[1]

def window_height(window):
return window.getmaxyx()[0]

class Log:
entries = []
cur_pos = 0
def __init__(self):
self.entries = []
self.cur_pos = 0

# TODO: store timestamp as actual timestamp indexing the messages
def log(self, msg):
Expand Down Expand Up @@ -81,7 +76,7 @@ def curses_main(stdscr):

# Page layout. Currently requires at least ~40 rows.
# TODO: make everything dynamically resize to best use available space
header_height = 2
header_height = 3
jobs_height = 10
dirs_height = 14
logscreen_height = n_rows - (header_height + jobs_height + dirs_height)
Expand All @@ -97,7 +92,7 @@ def curses_main(stdscr):
refresh_period = int(sched_cfg['polling_time_s'])

stdscr.nodelay(True) # make getch() non-blocking
stdscr.timeout(5000) # this doesn't seem to do anything....
stdscr.timeout(2000)

header_win = curses.newwin(header_height, n_cols, header_pos, 0)
log_win = curses.newwin(logscreen_height, n_cols, logscreen_pos, 0)
Expand All @@ -111,19 +106,28 @@ def curses_main(stdscr):

while True:

# todo: none of this resizing works
(n_rows, n_cols) = map(int, stdscr.getmaxyx())
# TODO: handle resizing. Need to (1) figure out how to reliably get
# the terminal size -- the recommended method doesn't seem to work:
# (n_rows, n_cols) = [int(v) for v in stdscr.getmaxyx()]
# Consider instead:
# ...[int(v) for v in os.popen('stty size', 'r').read().split()]
# and then (2) implement the logic to resize all the subwindows as above

stdscr.clear()
linecap = n_cols - 1
logscreen_height = n_rows - (header_height + jobs_height + dirs_height)

elapsed = (datetime.datetime.now() - last_refresh).total_seconds()
if (elapsed < refresh_period):
# Lightweight; does virtually no work if there are no new jobs.

# A full refresh scans for and reads info for running jobs from
# scratch (i.e., reread their logfiles). Otherwise we'll only
# initialize new jobs, and mostly rely on cached info.
do_full_refresh = elapsed >= refresh_period

if not do_full_refresh:
jobs = Job.get_running_jobs_w_cache(dir_cfg['log'], jobs)

else:
# Full refresh
last_refresh = datetime.datetime.now()
jobs = Job.get_running_jobs(dir_cfg['log'])
# Look for running archive jobs. Be robust to finding more than one
Expand All @@ -134,7 +138,7 @@ def curses_main(stdscr):
(started, msg) = manager.maybe_start_new_plot(dir_cfg, sched_cfg, plotting_cfg)
if (started):
log.log(msg)
plotting_status = '<just started plot>'
plotting_status = '<just started job>'
jobs = Job.get_running_jobs_w_cache(dir_cfg['log'], jobs)
else:
plotting_status = msg
Expand Down Expand Up @@ -162,26 +166,27 @@ def curses_main(stdscr):
dst_prefix = os.path.commonpath(dir_cfg['dst'])
arch_prefix = dir_cfg['archive']['rsyncd_path']

# Render
stdscr.clear()

# Header
header_win.addnstr(0, 0, 'Plotman', linecap, curses.A_BOLD)
header_win.addnstr(' %s (refresh %ds/%ds)' %
(datetime.datetime.now().strftime("%H:%M:%S"), elapsed, refresh_period),
linecap)
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
refresh_msg = "now" if do_full_refresh else f"{elapsed}s/{refresh_period}"
header_win.addnstr(f" {timestamp} (refresh {refresh_msg})", linecap)
header_win.addnstr(' | <P>lotting: ', linecap, curses.A_BOLD)
header_win.addnstr(
plotting_status_msg(plotting_active, plotting_status), linecap)
header_win.addnstr(' <A>rchival: ', linecap, curses.A_BOLD)
header_win.addnstr(
archiving_status_msg(archiving_active, archiving_status), linecap)

# Oneliner progress display
header_win.addnstr(1, 0, 'Jobs (%d): ' % len(jobs), linecap)
header_win.addnstr('[' + reporting.job_viz(jobs) + ']', linecap)

# These are useful for debugging.
# header_win.addnstr(' term size: (%d, %d)' % (n_rows, n_cols), linecap) # Debuggin
# if pressed_key:
# header_win.addnstr(' (keypress %s)' % str(pressed_key), linecap)
header_win.addnstr(1, 0, 'Prefixes:', linecap, curses.A_BOLD)
header_win.addnstr(2, 0, 'Prefixes:', linecap, curses.A_BOLD)
header_win.addnstr(' tmp=', linecap, curses.A_BOLD)
header_win.addnstr(tmp_prefix, linecap)
header_win.addnstr(' dst=', linecap, curses.A_BOLD)
Expand Down
18 changes: 12 additions & 6 deletions job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ def job_phases_for_dstdir(d, all_jobs):
'''Return phase 2-tuples for jobs outputting to dstdir d'''
return sorted([j.progress() for j in all_jobs if j.dstdir == d])

def is_plotting_cmdline(cmdline):
return (
len(cmdline) >= 4
and 'python' in cmdline[0]
and 'venv/bin/chia' in cmdline[1]
and 'plots' == cmdline[2]
and 'create' == cmdline[3]
)

# TODO: be more principled and explicit about what we cache vs. what we look up
# dynamically from the logfile
class Job:
Expand Down Expand Up @@ -65,15 +74,12 @@ def get_running_jobs_w_cache(logroot, existing_jobs):
jobs = []
existing_jobs_by_pid = { j.proc.pid: j for j in existing_jobs }

for proc in psutil.process_iter(['pid', 'name']):
if os.path.basename(proc.name()) == 'chia':
for proc in psutil.process_iter(['pid', 'cmdline']):
if is_plotting_cmdline(proc.cmdline()):
if proc.pid in existing_jobs_by_pid.keys():
jobs.append(existing_jobs_by_pid[proc.pid]) # Copy from cache
else:
args = proc.cmdline()
# n.b.: args[0]=python, args[1]=chia
if len(args) >= 4 and args[2] == 'plots' and args[3] == 'create':
jobs.append(Job(proc, logroot))
jobs.append(Job(proc, logroot))

return jobs

Expand Down
50 changes: 46 additions & 4 deletions reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import archive
import job
import manager
import math
import plot_util

def abbr_path(path, putative_prefix):
Expand All @@ -13,9 +14,49 @@ def abbr_path(path, putative_prefix):
else:
return path

def phases_str(phases):
def phases_str(phases, max_num=None):
'''Take a list of phase-subphase pairs and return them as a compact string'''
return ', '.join(['%d:%d' % ph_subph for ph_subph in phases])
if not max_num or len(phases) <= max_num:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this taste, but... I am relatively in the camp of 'turning other things into booleans is not great'. For example, lots of people will if some_list: while I will if len(some_list) > 0: because, well... It is more explicit and shows issues sooner if some_list == None or such. Others could argue this better than me. But certainly, lots of people do leverage the truthiness of lots of non-boolean objects.

Suggested change
if not max_num or len(phases) <= max_num:
if max_num is None or len(phases) <= max_num:

return ' '.join(['%d:%d' % pair for pair in phases])

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't bother changing all the % formatting everywhere, but I also basically never write it. Partly because I don't log as much as I should. Logging calls are a place where, afaik, you are kind of stuck with % formatting because of everything having to use the same means of formatting to get the delayed conditional formatting related to logging to a level that isn't configured to be processed.

else:
n_first = math.floor(max_num / 2)
n_last = max_num - n_first
n_elided = len(phases) - (n_first + n_last)
first = ' '.join(['%d:%d' % pair for pair in phases[:n_first]])
elided = " [+%d] " % n_elided
last = ' '.join(['%d:%d' % pair for pair in phases[n_first + n_elided:]])
return first + elided + last

def n_at_ph(jobs, ph):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't abbreviate much, probably to a fault.

Suggested change
def n_at_ph(jobs, ph):
def job_count_at_phase(jobs, phase):

return sum([1 for j in jobs if j.progress() == ph])

def n_to_char(n):
n_to_char_map = dict(enumerate(" .:;!"))

if n < 0:
return 'X' # Should never be negative
elif n >= len(n_to_char_map):
n = len(n_to_char_map) - 1

return n_to_char_map[n]

def job_viz(jobs):
# TODO: Rewrite this in a way that ensures we count every job
# even if the reported phases don't line up with expectations.
result = ''
result += '1'
for i in range(0, 8):
result += n_to_char(n_at_ph(jobs, (1, i)))
result += '2'
for i in range(0, 8):
result += n_to_char(n_at_ph(jobs, (2, i)))
result += '3'
for i in range(0, 7):
result += n_to_char(n_at_ph(jobs, (3, i)))
result += '4'
result += n_to_char(n_at_ph(jobs, (4, 0)))
return result


def status_report(jobs, width, height=None, tmp_prefix='', dst_prefix=''):
'''height, if provided, will limit the number of rows in the table,
Expand Down Expand Up @@ -103,7 +144,7 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''):
tab = tt.Texttable()
dir2oldphase = manager.dstdirs_to_furthest_phase(jobs)
dir2newphase = manager.dstdirs_to_youngest_phase(jobs)
headings = ['dst', 'plots', 'GB free', 'phases', 'priority']
headings = ['dst', 'plots', 'GBfree', 'inbnd phases', 'pri']
tab.header(headings)
tab.set_cols_dtype('t' * len(headings))

Expand All @@ -117,7 +158,8 @@ def dst_dir_report(jobs, dstdirs, width, prefix=''):
gb_free = int(plot_util.df_b(d) / plot_util.GB)
n_plots = len(dir_plots)
priority = archive.compute_priority(eldest_ph, gb_free, n_plots)
row = [abbr_path(d, prefix), n_plots, gb_free, phases_str(phases), priority]
row = [abbr_path(d, prefix), n_plots, gb_free,
phases_str(phases, 5), priority]
tab.add_row(row)
tab.set_max_width(width)
tab.set_deco(tt.Texttable.BORDER | tt.Texttable.HEADER )
Expand Down
61 changes: 61 additions & 0 deletions reporting_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/python3

from unittest.mock import patch

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very rarely mock.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, you rarely need to, or there's another mechanism you prefer, or you structure your code so you don't need mock-style mechanisms?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to avoid the need. I see tests as a second use of your code. If it's hard to test, it can mean your code isn't as flexible as it ought to be. So yeah, different code structure often. Lots of details to discuss in each actual case.


import os
import pyfakefs
import unittest

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always use pytest.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, OK, added an issue to consider switching to pytest.


import reporting

class TestReporting(unittest.TestCase):
def test_phases_str(self):
self.assertEqual('1:2 2:3 3:4 4:0',
reporting.phases_str([(1,2), (2,3), (3,4), (4,0)]))
self.assertEqual('1:2 [+1] 3:4 4:0',
reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 3))
self.assertEqual('1:2 [+2] 4:0',
reporting.phases_str([(1,2), (2,3), (3,4), (4,0)], 2))

def test_job_viz_empty(self):
self.assertEqual('1 2 3 4 ',
reporting.job_viz([]) )

@patch('job.Job')
def job_w_phase(self, ph, MockJob):
j = MockJob()
j.progress.return_value = ph
return j

def test_job_viz_positions(self):
jobs = [self.job_w_phase((1, 1)),
self.job_w_phase((2, 0)),
self.job_w_phase((2, 4)),
self.job_w_phase((2, 7)),
self.job_w_phase((4, 0))]

self.assertEqual('1 . 2. . .3 4.',
reporting.job_viz(jobs))

def test_job_viz_counts(self):
jobs = [self.job_w_phase((2, 2)),
self.job_w_phase((2, 3)),
self.job_w_phase((2, 3)),
self.job_w_phase((2, 4)),
self.job_w_phase((2, 4)),
self.job_w_phase((2, 4)),
self.job_w_phase((2, 5)),
self.job_w_phase((2, 5)),
self.job_w_phase((2, 5)),
self.job_w_phase((2, 5)),
self.job_w_phase((3, 1)),
self.job_w_phase((3, 1)),
self.job_w_phase((3, 1)),
self.job_w_phase((3, 1)),
self.job_w_phase((3, 1)),
self.job_w_phase((3, 1)),
]

self.assertEqual('1 2 .:;! 3 ! 4 ',
reporting.job_viz(jobs))