Source code for do_merge

"""Actually perform the merging"""

#test?#
import sys
import os
import json
import subprocess
import tarfile
import socket
from datetime import datetime, timezone

[docs] def checksums(filename: str) -> dict: """Calculate the checksum of a file""" proc = subprocess.run(['xrdadler32', filename], capture_output=True, check=False) if proc.returncode != 0: raise ValueError('xrdadler32 failed', proc.returncode, proc.stderr) checksum = proc.stdout.decode('utf-8').split()[0] results = {'adler32':checksum} return results
def renew_token(): "" cmd = "htgettoken -i dune --vaultserver htvaultprod.fnal.gov -r interactive --nooidc".split(" ") try: print ("Renewing token with command: %s"%(" ".join(cmd))) rval = subprocess.run(cmd, check=True) print ("Token renewed",rval) except: print ("WARNING: Token renewal failed, skip for now")
[docs] def merge_hadd(output: str, inputs: list) -> None: """Merge the input files using hadd""" cmd = ['hadd', '-v', '0', '-f', output] + inputs print(f"Running command:\n{' '.join(cmd)}") subprocess.run(cmd, check=True)
[docs] def merge_lar(output: str, inputs: list[str], cfg: str) -> None: """Merge the input files using lar""" cmd = ['lar', '-c', cfg, '-o', output] + inputs print(f"Running command:\n{' '.join(cmd)}") subprocess.run(cmd, check=True)
[docs] def merge_hdf5(output: str, inputs: list[str], cfg: str) -> None: """Merge the input files into an HDF5 file""" import hdf5_merge # pylint: disable=import-outside-toplevel cfg_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config', 'hdf5', cfg) hdf5_merge.merge_hdf5(output, inputs, cfg_path)
[docs] def merge_tar(output: str, inputs: list[str]) -> None: """Merge the input files into a tar.gz archive""" with tarfile.open(output,"w:gz") as tar: for file in inputs: tar.add(file,os.path.basename(file))
[docs] def local_copy(inputs: list[str], outdir: str) -> list[str]: """Make a local copy of the input files""" tmp_files = [] tmp_dir = os.path.join(outdir, "tmp") print(f"Making local copy of input files in {tmp_dir}:") for i, path in enumerate(inputs): basename = os.path.basename(path) if os.path.exists(path): print(f" Skipping {basename} (file already local)") continue local_path = os.path.join(tmp_dir, basename) os.makedirs(os.path.dirname(local_path), exist_ok=True) if os.path.exists(local_path): print(f" Checking {basename} (local copy already exists)") try: cmd = ['xrdcp', path, local_path, '-C', 'adler32', '--continue'] subprocess.run(cmd, check=True) except subprocess.CalledProcessError: print(f" Replacing {basename} (existing local copy is corrupted)") os.remove(local_path) cmd = ['xrdcp', path, local_path, '-C', 'adler32'] subprocess.run(cmd, check=True) else: print(f" Copying {basename}") cmd = ['xrdcp', path, local_path, '-C', 'adler32'] subprocess.run(cmd, check=True) tmp_files.append(local_path) inputs[i] = local_path print(f"Copied {len(tmp_files)} files") return tmp_files
[docs] def merge(config: dict, outdir: str) -> None: """Merge the input files into a single output file""" method = config['metadata']['merge.method'] output = os.path.join(outdir, config['name']) inputs = config.pop('inputs') streaming = config.pop('streaming', False) # Renew token if on interactive gpvm at fnal if "dunegpvm" in socket.gethostname(): renew_token() # Make local copies of the input files if not streaming tmp_files = [] if not streaming: tmp_files = local_copy(inputs, outdir) # Remove output files if they already exist timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") if os.path.exists(output): oldname = output+"_"+timestamp+".bak" os.rename(output,oldname) print(f"WARNING: Output file {output} already exists, renaming to {oldname}") json_name = output + '.json' if os.path.exists(json_name): oldname = json_name+"_"+timestamp+".bak" os.rename(json_name,oldname) print(f"WARNING: JSON file {json_name} already exists, renaming to {oldname}") # Merge the input files based on the specified method if method == "hadd": merge_hadd(output, inputs) elif method == "lar": lar_config = config['metadata']['merge.cfg'] merge_lar(output, inputs, lar_config) elif method == "hdf5": hdf5_config = config['metadata']['merge.cfg'] merge_hdf5(output, inputs, hdf5_config) elif method == "tar": merge_tar(output, inputs) else: raise ValueError(f"Unsupported merge method: {method}") # Write metadata to a JSON file config['size'] = os.path.getsize(output) config['checksums'] = checksums(output) try: with open(json_name, 'w', encoding="utf-8") as fjson: fjson.write(json.dumps(config, indent=2)) except Exception as e: print(f"WARNING: Could not write JSON file {json_name}: {e}") # Clean up temporary files if not streaming and len(tmp_files) > 0: print("Deleting local input file copies") for file in tmp_files: os.remove(file)
[docs] def main(): """Main function for command line execution""" with open(sys.argv[1], encoding="utf-8") as f: config = json.load(f) outdir = sys.argv[2] if len(sys.argv) > 2 else '.' merge(config, outdir)
if __name__ == '__main__': main()