Source code for do_merge
#!/usr/bin/env python3
"""Actually perform the merging"""
import sys
import os
import json
import subprocess
import tarfile
[docs]
def checksums(filename: str) -> dict:
"""Calculate the checksum of a file"""
proc = subprocess.run(['xrdadler32', filename], capture_output=True, check=False)
if proc.returncode != 0:
raise ValueError('xrdadler32 failed', proc.returncode, proc.stderr)
checksum = proc.stdout.decode('utf-8').split()[0]
results = {'adler32':checksum}
return results
[docs]
def merge_hadd(output: str, inputs: list) -> None:
"""Merge the input files using hadd"""
cmd = ['hadd', '-v', '0', '-f', output] + inputs
print(f"Running command:\n{' '.join(cmd)}")
subprocess.run(cmd, check=True)
[docs]
def merge_lar(output: str, inputs: list[str], config: str) -> None:
"""Merge the input files using lar"""
cmd = ['lar', '-c', config, '-o', output] + inputs
print(f"Running command:\n{' '.join(cmd)}")
subprocess.run(cmd, check=True)
[docs]
def merge_hdf5(output: str, inputs: list[str]) -> None:
"""Merge the input files into an HDF5 file"""
raise NotImplementedError("HDF5 merging is not yet implemented")
#TODO: investigate https://github.com/NU-CUCIS/ph5concat
[docs]
def merge_tar(output: str, inputs: list[str]) -> None:
"""Merge the input files into a tar.gz archive"""
with tarfile.open(output,"w:gz") as tar:
for file in inputs:
tar.add(file,os.path.basename(file))
[docs]
def merge(config: dict, outdir: str) -> None:
"""Merge the input files into a single output file"""
method = config['metadata']['merge.method']
output = os.path.join(outdir, config['name'])
inputs = config.pop('inputs')
# Merge the input files based on the specified method
if method == "hadd":
merge_hadd(output, inputs)
elif method == "lar":
lar_config = config['metadata']['merge.fcl']
merge_lar(output, inputs, lar_config)
elif method == "hdf5":
merge_hdf5(output, inputs)
elif method == "tar":
merge_tar(output, inputs)
else:
raise ValueError(f"Unsupported merge method: {method}")
# Clean up the configuration dictionary
config['size'] = os.path.getsize(output)
config['checksums'] = checksums(output)
# Write the configuration to a JSON file
json_name = output + '.json'
with open(json_name, 'w', encoding="utf-8") as fjson:
fjson.write(json.dumps(config, indent=2))
[docs]
def main():
"""Main function for command line execution"""
with open(sys.argv[1], encoding="utf-8") as f:
config = json.load(f)
outdir = sys.argv[2] if len(sys.argv) > 2 else '.'
merge(config, outdir)
if __name__ == '__main__':
main()