Source code for fiberassign.scripts.merge

# Licensed under a 3-clause BSD style license - see LICENSE.rst
# -*- coding: utf-8 -*-
"""
fiberassign.scripts.merge
============================

High-level functions for merging target catalogs with output files.

"""
from __future__ import absolute_import, division, print_function

import os
import argparse

import numpy as np

from ..utils import Logger

from ..assign import (merge_results, result_tiles)


[docs]def parse_merge(optlist=None): """Parse merging options. This parses either sys.argv or a list of strings passed in. If passing an option list, you can create that more easily using the :func:`option_list` function. Args: optlist (list, optional): Optional list of arguments to parse instead of using sys.argv. Returns: (namespace): an ArgumentParser namespace. """ parser = argparse.ArgumentParser() parser.add_argument("--targets", type=str, required=True, nargs="+", help="Input file with targets of any type. This " "argument can be specified multiple times (for " "example if standards / skies / science targets are " "in different files).") parser.add_argument("--sky", type=str, required=False, nargs="+", help="Input file with sky or 'bad sky' targets. " "This option exists in order to treat main-survey" " sky target files as valid for other survey types." " If you are running a main survey assignment, you" " can just pass the sky file to the --targets list.") parser.add_argument("--dir", type=str, required=True, default=None, help="Directory containing fiberassign results.") parser.add_argument("--prefix", type=str, required=False, default="fba-", help="Prefix of each file (before the <tile>.fits).") parser.add_argument("--split", required=False, default=False, action="store_true", help="Results are in tile prefix directories.") parser.add_argument("--out", type=str, required=False, default=None, help="Output directory for the merged files. Default" " is the directory containing the fiberassign output.") parser.add_argument("--out_prefix", type=str, required=False, default="fiberassign-", help="Prefix of each output file.") parser.add_argument("--out_split", required=False, default=False, action="store_true", help="Split output into tile prefix directories.") parser.add_argument("--columns", type=str, required=False, default=None, help="Override the column names of target data to be " "copied from the target files into the fiber " "assignment files. This should be a comma-separated " "list.") parser.add_argument("--skip_raw", required=False, default=False, action="store_true", help="If true, do not copy the raw fiberassign HDUs" "to the merged output.") args = None if optlist is None: args = parser.parse_args() else: args = parser.parse_args(optlist) if args.sky is None: args.sky = list() return args
def run_merge_init(args, comm=None): """Initialize merging inputs. This uses the previously parsed options to load the input files needed. Args: args (namespace): The parsed arguments. Returns: (tuple): The (Tiles, columns) needed to run the merging. """ log = Logger.get() # Check directory if (comm is None) or (comm.rank == 0): if not os.path.isdir(args.dir): log.error("Results directory {} does not exist".format(args.dir)) if comm is not None: comm.Abort() # Check columns columns = None if args.columns is not None: coltest = args.columns.split(",") columns = [x for x in coltest if x != "TARGETID"] tiles = None if (comm is None) or (comm.rank == 0): tiles = result_tiles(dir=args.dir, prefix=args.prefix) if comm is not None: tiles = comm.bcast(tiles, root=0) return (tiles, columns)
[docs]def run_merge(args): """Run output merging. This uses the previously parsed options to read input data and perform merging of the input catalogs. This runs on one node and uses multiprocessing. Args: args (namespace): The parsed arguments. Returns: None """ tiles, columns = run_merge_init(args) merge_results(args.targets, args.sky, tiles, result_dir=args.dir, result_prefix=args.prefix, result_split_dir=args.split, out_dir=args.out, out_prefix=args.out_prefix, out_split_dir=args.out_split, columns=columns, copy_fba=(not args.skip_raw)) return
def run_merge_mpi(args, comm): """Run output merging. This uses the previously parsed options to read input data and perform merging of the input catalogs. This is designed to be run with one MPI process per node. Each MPI process in the input communicator will then use multiprocessing for further parallelism. Args: args (namespace): The parsed arguments. comm (MPI.Comm): The MPI communicator. Returns: None """ log = Logger.get() tiles, columns = run_merge_init(args, comm) ptiles = np.array_split(tiles, comm.size)[comm.rank] if len(ptiles) > 0: log.info("proc {} doing {} tiles".format(comm.rank, len(ptiles))) merge_results(args.targets, args.sky, ptiles, result_dir=args.dir, result_prefix=args.prefix, out_dir=args.out, out_prefix=args.out_prefix, columns=columns, copy_fba=(not args.skip_raw)) return