Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def test_basic():
HTCondorCluster,
MoabCluster,
OARCluster,
HTCondorCluster,
]


Expand Down
113 changes: 62 additions & 51 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,30 @@
SGECluster,
LSFCluster,
OARCluster,
HTCondorCluster,
)
from dask_jobqueue.core import Job
from dask_jobqueue.local import LocalCluster

from dask_jobqueue.sge import SGEJob

all_clusters = [
PBSCluster,
MoabCluster,
SLURMCluster,
SGECluster,
LSFCluster,
OARCluster,
HTCondorCluster,
]


def create_cluster_func(cluster_cls, **kwargs):
if cluster_cls is HTCondorCluster:
# HTCondorCluster has a mandatory disk argument
kwargs.setdefault("disk", "1GB")
return cluster_cls(**kwargs)


def test_errors():
match = re.compile("Job type.*job_cls", flags=re.DOTALL)
Expand Down Expand Up @@ -52,27 +70,28 @@ def test_command_template():
assert " --preload mymodule" in cluster._dummy_job._command_template


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_shebang_settings(Cluster):
if Cluster is HTCondorCluster:
pytest.skip(
"HTCondorCluster has a peculiar submit script and does not have a shebang"
)
default_shebang = "#!/usr/bin/env bash"
python_shebang = "#!/usr/bin/python"
with Cluster(cores=2, memory="4GB", shebang=python_shebang) as cluster:
with create_cluster_func(
Cluster, cores=2, memory="4GB", shebang=python_shebang
) as cluster:
job_script = cluster.job_script()
assert job_script.startswith(python_shebang)
assert "bash" not in job_script
with Cluster(cores=2, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=2, memory="4GB") as cluster:
job_script = cluster.job_script()
assert job_script.startswith(default_shebang)


@pytest.mark.parametrize(
"Cluster", [PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster]
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_dashboard_link(Cluster):
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
assert re.match(r"http://\d+\.\d+\.\d+.\d+:\d+/status", cluster.dashboard_link)


Expand Down Expand Up @@ -110,7 +129,7 @@ def test_forward_ip():
def test_job_id_from_qsub_legacy(Cluster, qsub_return_string):
original_job_id = "654321"
qsub_return_string = qsub_return_string.format(job_id=original_job_id)
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
assert original_job_id == cluster._job_id_from_submit_output(qsub_return_string)


Expand All @@ -136,13 +155,13 @@ def test_job_id_from_qsub(job_cls, qsub_return_string):
@pytest.mark.parametrize("Cluster", [])
def test_job_id_error_handling_legacy(Cluster):
# non-matching regexp
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
with pytest.raises(ValueError, match="Could not parse job id"):
return_string = "there is no number here"
cluster._job_id_from_submit_output(return_string)

# no job_id named group in the regexp
with Cluster(cores=1, memory="1GB") as cluster:
with create_cluster_func(Cluster, cores=1, memory="1GB") as cluster:
with pytest.raises(ValueError, match="You need to use a 'job_id' named group"):
return_string = "Job <12345> submitted to <normal>."
cluster.job_id_regexp = r"(\d+)"
Expand Down Expand Up @@ -199,20 +218,17 @@ def test_jobqueue_cluster_call(tmpdir):
cluster._call([sys.executable, path_with_error.strpath])


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_cluster_has_cores_and_memory(Cluster):
base_regex = r"{}.+".format(Cluster.__name__)
with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='\d+GB'"):
Cluster()
create_cluster_func(Cluster)

with pytest.raises(ValueError, match=base_regex + r"cores=\d, memory='1GB'"):
Cluster(memory="1GB")
create_cluster_func(Cluster, memory="1GB")

with pytest.raises(ValueError, match=base_regex + r"cores=4, memory='\d+GB'"):
Cluster(cores=4)
create_cluster_func(Cluster, cores=4)


@pytest.mark.asyncio
Expand Down Expand Up @@ -256,54 +272,51 @@ def __init__(self, *args, **kwargs):
MyCluster(cores=1, memory="1GB")


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_default_number_of_worker_processes(Cluster):
with Cluster(cores=4, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=4, memory="4GB") as cluster:
assert " --nprocs 4" in cluster.job_script()
assert " --nthreads 1" in cluster.job_script()

with Cluster(cores=6, memory="4GB") as cluster:
with create_cluster_func(Cluster, cores=6, memory="4GB") as cluster:
assert " --nprocs 3" in cluster.job_script()
assert " --nthreads 2" in cluster.job_script()


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_scheduler_options(Cluster):
net_if_addrs = psutil.net_if_addrs()
interface = list(net_if_addrs.keys())[0]
port = 8804

with Cluster(
cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
scheduler_options={"interface": interface, "port": port},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options["interface"] == interface
assert scheduler_options["port"] == port


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_scheduler_options_interface(Cluster):
net_if_addrs = psutil.net_if_addrs()
scheduler_interface = list(net_if_addrs.keys())[0]
worker_interface = "worker-interface"
scheduler_host = socket.gethostname()

with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster:
with create_cluster_func(
Cluster, cores=1, memory="1GB", interface=scheduler_interface
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == scheduler_interface

with Cluster(
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
interface=worker_interface,
Expand All @@ -314,7 +327,8 @@ def test_scheduler_options_interface(Cluster):
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == worker_interface

with Cluster(
with create_cluster_func(
Cluster,
cores=1,
memory="1GB",
interface=worker_interface,
Expand All @@ -326,29 +340,25 @@ def test_scheduler_options_interface(Cluster):
assert worker_options["interface"] == worker_interface


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster):
scheduler_host = socket.gethostname()
message_template = "pass {!r} through 'scheduler_options'"

message = message_template.format("host")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", host=scheduler_host):
with create_cluster_func(Cluster, cores=1, memory="1GB", host=scheduler_host):
pass

message = message_template.format("dashboard_address")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", dashboard_address=":8787"):
with create_cluster_func(
Cluster, cores=1, memory="1GB", dashboard_address=":8787"
):
pass


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
@pytest.mark.parametrize("Cluster", all_clusters)
def test_import_scheduler_options_from_config(Cluster):

net_if_addrs = psutil.net_if_addrs()
Expand All @@ -369,12 +379,13 @@ def test_import_scheduler_options_from_config(Cluster):
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options}
):

with Cluster(cores=2, memory="2GB") as cluster:
with create_cluster_func(Cluster, cores=2, memory="2GB") as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == config_scheduler_interface
assert scheduler_options.get("port") == config_scheduler_port

with Cluster(
with create_cluster_func(
Cluster,
cores=2,
memory="2GB",
scheduler_options={"interface": pass_scheduler_interface},
Expand Down