"""JobScheduler classes"""
import logging
import os
import sys
import json
import tarfile
import subprocess
import collections
from abc import ABC, abstractmethod
from merge_utils import io_utils, config
from merge_utils.retriever import FileRetriever
logger = logging.getLogger(__name__)
[docs]
class JobScheduler(ABC):
"""Base class for scheduling a merge job"""
def __init__(self, source: FileRetriever):
"""
Initialize the JobScheduler with a source of files to merge.
:param source: FileRetriever object to provide input files
"""
self.source = source
self.dir = os.path.join(io_utils.pkg_dir(), "tmp", io_utils.get_timestamp())
self.pass1 = []
self.pass2 = []
[docs]
def get_chunks(self) -> None:
"""Run the FileRetriever and get the list of chunks."""
self.source.run()
for chunk in self.source.output_chunks():
json_dict = chunk.json
if chunk.chunks: # if the chunk has sub-chunks it is pass 2
json_name = os.path.join(self.dir, f"pass2_{len(self.pass2):>06}.json")
self.pass2.append(json_name)
else: # if the chunk has no sub-chunks it is pass 1
json_name = os.path.join(self.dir, f"pass1_{len(self.pass1):>06}.json")
self.pass1.append(json_name)
with open(json_name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(json_dict, indent=2))
[docs]
@abstractmethod
def run(self) -> None:
"""Run the job scheduler."""
raise NotImplementedError("Subclasses must implement this method")
[docs]
class LocalScheduler():
"""Job scheduler for local merge jobs"""
def __init__(self, source: FileRetriever):
"""
Initialize the LocalScheduler with a source of files to merge.
:param source: FileRetriever object to provide input files
"""
self.source = source
self.dir = os.path.join(io_utils.pkg_dir(), "tmp", io_utils.get_timestamp())
self.pass1 = []
self.pass2 = []
[docs]
def json_name(self, tier: int) -> str:
"""
Get the name of the next JSON config file for a given pass and site.
:param tier: Pass number (1 or 2)
:param site: Optional site name
:return: Name of the JSON file
"""
if tier == 1:
site_jobs = self.pass1
elif tier == 2:
site_jobs = self.pass2
else:
raise ValueError("Tier must be 1 or 2")
idx = len(site_jobs) + 1
name = f"pass{tier}_{idx:>06}.json"
name = os.path.join(self.dir, name)
site_jobs.append(name)
return name
[docs]
def get_chunks(self) -> None:
"""Run the FileRetriever and get the list of chunks."""
self.source.run()
os.makedirs(self.dir)
for chunk in self.source.output_chunks():
json_dict = chunk.json
name = self.json_name(chunk.tier)
with open(name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(json_dict, indent=2))
[docs]
def run(self) -> None:
"""Run the job scheduler."""
out_dir = config.output['dir']
if not os.path.exists(out_dir):
try:
os.makedirs(out_dir, exist_ok=True)
logger.info("Output directory '%s' created", out_dir)
except OSError as error:
logger.critical("Failed to create output directory '%s': %s", out_dir, error)
sys.exit(1)
self.get_chunks()
if not self.pass1:
logger.warning("No files to merge")
return
script1 = os.path.join(self.dir, "submit_pass1.sh")
with open(script1, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will run the merge jobs for pass 1\n")
for chunk in self.pass1:
cmd = ['python3', os.path.join(io_utils.src_dir(), "do_merge.py"), chunk, out_dir]
f.write(f"{' '.join(cmd)}\n")
subprocess.run(['chmod', '+x', script1], check=False)
if self.pass2:
script2 = os.path.join(self.dir, "submit_pass2.sh")
with open(script2, 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will run the merge jobs for pass 2\n")
for chunk in self.pass2:
cmd = ['python3', os.path.join(io_utils.src_dir(), "do_merge.py"), chunk, out_dir]
f.write(f"{' '.join(cmd)}\n")
subprocess.run(['chmod', '+x', script2], check=False)
logger.info("Local job scripts written to %s", self.dir)
#subprocess.run(cmd, check=True)
[docs]
class JustinScheduler():
"""Job scheduler for JustIN merge jobs"""
def __init__(self, source: FileRetriever):
"""
Initialize the JustinScheduler with a source of files to merge.
:param source: FileRetriever object to provide input files
"""
self.source = source
self.dir = os.path.join(io_utils.pkg_dir(), "tmp", io_utils.get_timestamp())
self.pass1 = collections.defaultdict(list)
self.pass2 = collections.defaultdict(list)
self.cvmfs_dir = None
[docs]
def json_name(self, tier: int, site: str = None) -> str:
"""
Get the name of the next JSON config file for a given pass and site.
:param tier: Pass number (1 or 2)
:param site: Optional site name
:return: Name of the JSON file
"""
if tier == 1:
site_jobs = self.pass1[site]
elif tier == 2:
site_jobs = self.pass2[site]
else:
raise ValueError("Tier must be 1 or 2")
idx = len(site_jobs) + 1
if site:
name = f"pass{tier}_{site}_{idx:>06}.json"
else:
name = f"pass{tier}_{idx:>06}.json"
name = os.path.join(self.dir, name)
site_jobs.append(name)
return name
[docs]
def get_chunks(self) -> None:
"""Run the FileRetriever and get the list of chunks."""
self.source.run()
os.makedirs(self.dir)
for chunk in self.source.output_chunks():
json_dict = chunk.json
site = chunk.site
name = self.json_name(chunk.tier, site)
with open(name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(json_dict, indent=2))
[docs]
def upload_cfg(self) -> None:
"""
Make a tarball of the configuration files and upload them to cvmfs
:return: Path to the uploaded configuration directory
"""
cfg = os.path.join(self.dir, "config.tar")
with tarfile.open(cfg,"w") as tar:
for _, files in collections.ChainMap(self.pass1, self.pass2).items():
for file in files:
tar.add(file, os.path.basename(file))
tar.add(os.path.join(io_utils.src_dir(), "do_merge.py"), "do_merge.py")
proc = subprocess.run(['justin-cvmfs-upload', cfg], capture_output=True, check=False)
if proc.returncode != 0:
logger.error("Failed to upload configuration files: %s", proc.stderr.decode('utf-8'))
raise RuntimeError("Failed to upload configuration files")
self.cvmfs_dir = proc.stdout.decode('utf-8').strip()
logger.info("Uploaded configuration files to %s", self.cvmfs_dir)
[docs]
def get_cmd(self, tier: int, site: str = None) -> str:
"""
Get the command to run for a given tier and site.
:param tier: Pass number (1 or 2)
:param site: Optional site name
:return: Command string
"""
if tier == 1:
site_jobs = self.pass1[site]
elif tier == 2:
site_jobs = self.pass2[site]
else:
raise ValueError("Tier must be 1 or 2")
if not site_jobs:
raise ValueError(f"No jobs found for pass {tier} and site {site}")
cmd = [
'justin', 'simple-workflow',
'--monte-carlo', str(len(site_jobs)),
'--jobscript', os.path.join(io_utils.src_dir(), "merge.jobscript"),
'--env', f'MERGE_CONFIG="pass{tier}_{site}"',
'--env', f'CONFIG_DIR="{self.cvmfs_dir}"',
'--site', site,
'--scope', 'usertests',
'--output-pattern', '*_merged_*:merge-test',
'--lifetime-days', '1'
]
return cmd
[docs]
def run(self) -> None:
"""
Run the JustIN job scheduler.
:return: None
"""
self.get_chunks()
if not self.pass1:
logger.warning("No files to merge")
return
self.upload_cfg()
with open(os.path.join(self.dir, "pass1.sh"), 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will submit the JustIN jobs for pass 1\n")
for site in self.pass1:
cmd = self.get_cmd(1, site)
f.write(f"{' '.join(cmd)}\n")
if self.pass2:
with open(os.path.join(self.dir, "pass2.sh"), 'w', encoding="utf-8") as f:
f.write("#!/bin/bash\n")
f.write("# This script will submit the JustIN jobs for pass 2\n")
for site in self.pass2:
cmd = self.get_cmd(2, site)
f.write(f"{' '.join(cmd)}\n")
logger.info("JustIN job scripts written to %s", self.dir)
#subprocess.run(cmd, check=True)