Source code for uchuutools.converters.convert_ascii_ctrees_to_h5

#!/usr/bin/env python
from __future__ import print_function

__author__ = "Manodeep Sinha"
__all__ = ["convert_ctrees_to_h5"]

import os
import time
import io

from ..utils import get_metadata, get_parser, \
                    distribute_array_over_ntasks, get_approx_totnumhalos, \
                    check_and_decompress, check_for_contiguous_halos, \
                    resize_halo_datasets, write_halos, \
                    update_container_h5_file

from ..ctrees_utils import read_locations_and_forests, \
                           get_aggregate_forest_info,\
                           get_all_parallel_ctrees_filenames, \
                           validate_inputs_are_ctrees_files, \
                           check_forests_locations_filenames, \
                           get_treewalk_dtype_descr, add_tree_walk_indices


def _create_and_validate_halos_dset(hf, dtype, write_halo_props_cont=True):
    """
    Internal utility function to check the existing halo dataset in the file,
    and return a reference where the halos dataset can be written to.

    """

    forests_grp = hf['Forests']
    if write_halo_props_cont:
        halos_dset = dict()
        # Create a dataset for every halo property
        # For any given halo property, the value
        # for halos will be written contiguously
        # (structure of arrays)
        for name in dtype.names:
            halos_dset[name] = forests_grp[name]
            if hf.attrs['Nhalos'] != halos_dset[name].shape[0]:
                msg = f"Error: The dataset for halo property = '{name}' "\
                    f"does not contain *exactly* the same number of halos "\
                    f"as specified in the file attribute. "\
                    "shape of halo property dataset "\
                    f" = '{halos_dset[name].shape}' "\
                    f"nhalos in file attribute = {hf.attrs['Nhalos']}"
                raise AssertionError(msg)
    else:
        # Create a single dataset that contains all properties
        # of a given halo, then all properties of the next halo,
        # and so on (array of structures)
        halos_dset = forests_grp['halos']
        if hf.attrs['Nhalos'] != halos_dset.shape[0]:
            msg = f"Error: The halos dataset does not contain *exactly* the "\
                  f"same number of halos as specified in the file attribute. "\
                  f"shape of halo property dataset = '{halos_dset.shape}' "\
                  f"nhalos in file attribute = {hf.attrs['Nhalos']}"
            raise AssertionError(msg)

    return halos_dset


def _convert_ctrees_forest_range(forest_info, trees_and_locations, rank,
                                 outputdir, output_filebase,
                                 write_halo_props_cont,
                                 fields, drop_fields,
                                 truncate, compression,
                                 buffersize, use_pread,
                                 show_progressbar):
    """
    Convert a set of forests from Consistent Trees ascii file(s) into an
    (optionally compressed) hdf5 file.

    Parameters
    -----------

    forest_info: numpy structured array, required, shape: (nforests, )
        The numpy structured array containing the following info
        <ForestID ForestNhalos Input_ForestNbytes Ntrees> for the
        *forests* that are to be converted by this task

    trees_and_locations: numpy structured array, required, shape: (ntrees, )
        The numpy structured array containing the following info
        <ForestID TreeRootID FileName Offset TreeNbytes> for the
        *trees* that are to be converted by this task

    rank: integer, required
        The (MPI) rank for the process. The output filename is determined
        with this rank to ensure unique filenames when running in parallel.

    outputdir: string, required
        The directory where the converted hdf5 file will be written in. The
        output filename is obtained by appending '.h5' to the ``input_file``.

    output_filebase: string, required
        The output filename is constructed using
        '{outputdir}/{output_filebase}_{rank}.h5'

    write_halo_props_cont: boolean, required
        Controls if the individual halo properties are written as distinct
        datasets such that any given property for *all* halos is written
        contiguously (structure of arraysA).

        When set to False, only one dataset ('halos') is created under the
        group 'Forests', and *all* properties of a halo is written out
        contiguously (array of structures).

    fields: list of strings, required
        Describes which specific columns in the input file to carry across
        to the hdf5 file. Default action is to convert ALL columns.

    drop_fields: list of strings, required
        Describes which columns are not carried through to the hdf5 file.
        Processed after ``fields``, i.e., you can specify ``fields=None`` to
        create an initial list of *all* columns in the ascii file, and then
        specify ``drop_fields = [colname2, colname7, ...]``, and those columns
        will not be present in the hdf5 output.

    truncate: boolean, required
        Controls whether a new file is created on this 'rank'. When set to
        ``True``, the header info file is written out. Otherwise, the file
        is appended to.

    compression: string, required
        Controls the kind of compression applied. Valid options are anything
        that ``h5py`` accepts.

    buffersize: integer, required
        Controls the size of the buffer for how many halos are written out
        per write call to the hdf5 file. The number of halos written out is
        this buffersize divided the size of the datatype for individual halos.

    use_pread: boolean, optional, required
        Controls whether low-level i/o operations (through ``os.pread``) is
        used. Otherwise, higher-level i/o operations (via ``io.open``) is
        used. This option is only meaningful on linux systems (and python3+).
        Since ``pread`` does not change the file offset, additional
        parallelisation can be implemented reasonably easily.

    show_progressbar: boolean, required
        Controls whether a progressbar is printed. Only enables progressbar
        on rank==0, the remaining ranks ignore this keyword.


    Returns
    -------

        Returns ``True`` on successful completion.

    """

    import numpy as np
    import h5py
    import sys
    from tqdm import tqdm

    if rank != 0:
        show_progressbar = False

    try:
        os.pread
    except NameError:
        use_pread = False

    sys.stdout.flush()
    tstart = time.perf_counter()

    # Set the datalen for strings
    string_dtype = 'S1024'

    if not os.path.isdir(outputdir):
        msg = f"Error: The first parameter (output directory) = "\
              f"'{outputdir}' should be of type directory"
        raise ValueError(msg)

    ntrees = trees_and_locations.shape[0]
    nforests = forest_info.shape[0]
    if nforests > ntrees:
        msg = f"Error: Expected the number of trees = '{ntrees}' "\
               "to be *at most* equal to the number of "\
              f"forests = '{nforests}'"
        raise AssertionError(msg)
    if ntrees <= 0:
        msg = f"[Rank={rank}] Error: ntrees = {ntrees} should be >= 0"
        raise AssertionError(msg)
    totnbytes = forest_info['Input_ForestNbytes'].sum()
    print(f"[Rank={rank}]: processing {totnbytes} bytes "
          f"(in {ntrees} trees) spread over {nforests} forests...")

    alltreedatafiles = list(set(trees_and_locations['Filename']))
    assert len(alltreedatafiles) > 0
    validate_inputs_are_ctrees_files(alltreedatafiles)

    metadata_dict = get_metadata(alltreedatafiles[0])
    metadata = metadata_dict['metadata']
    version_info = metadata_dict['version']
    input_catalog_type = metadata_dict['catalog_type']
    hdrline = metadata_dict['headerline']

    parser = get_parser(alltreedatafiles[0], fields=fields,
                        drop_fields=drop_fields)

    mergertree_descr = get_treewalk_dtype_descr()
    output_dtype = np.dtype(parser.dtype.descr + mergertree_descr)

    approx_totnumhalos = 0
    for fname in alltreedatafiles:
        ind = np.where(trees_and_locations['Filename'] == fname)
        nbytes = np.sum(trees_and_locations['TreeNbytes'][ind])
        approx_totnumhalos += get_approx_totnumhalos(fname, ndatabytes=nbytes)

    if show_progressbar:
        pbar = tqdm(total=ntrees, unit=' trees', disable=None)

    if (not buffersize) or (buffersize < output_dtype.itemsize):
        buffersize = 1024*1024  # 1 MB
    max_nhalos_buffer = buffersize // output_dtype.itemsize
    chunks = (max_nhalos_buffer, )

    output_file = f"{outputdir}/{output_filebase}_{rank}.h5"
    if truncate:
        with h5py.File(output_file, "w") as hf:
            # give the HDF5 root some attributes
            hf.attrs['input_files'] = np.string_(alltreedatafiles)
            mtimes = [os.path.getmtime(f) for f in alltreedatafiles]
            hf.attrs['input_filedatestamp'] = np.array(mtimes)
            hf.attrs["input_catalog_type"] = np.string_(input_catalog_type)
            hf.attrs[f"{input_catalog_type}_version"] = np.string_(version_info)
            hf.attrs[f"{input_catalog_type}_columns"] = np.string_(hdrline)
            hf.attrs[f"{input_catalog_type}_metadata"] = np.string_(metadata)
            hf.attrs['contiguous-halo-props'] = write_halo_props_cont

            sim_grp = hf.create_group('simulation_params')
            simulation_params = metadata_dict['simulation_params']
            for k, v in simulation_params.items():
                sim_grp.attrs[f"{k}"] = v

            hf.attrs['HDF5_version'] = np.string_(h5py.version.hdf5_version)
            hf.attrs['h5py_version'] = np.string_(h5py.version.version)

            hf.attrs['Nforests'] = 0
            hf.attrs['Ntrees'] = 0
            hf.attrs['Nhalos'] = 0

            forest_dtype = np.dtype([('ForestID', np.int64),
                                     ('ForestHalosOffset', np.int64),
                                     ('ForestNhalos', np.int64),
                                     ('ForestNtrees', np.int64), ])
            hf.create_dataset('ForestInfo', (0,), dtype=forest_dtype,
                              chunks=True, compression=compression,
                              maxshape=(None,))

            tree_dtype = np.dtype([('ForestID', np.int64),
                                   ('TreeRootID', np.int64),
                                   ('TreeHalosOffset', np.int64),
                                   ('TreeNhalos', np.int64),
                                   ('Input_Filename', string_dtype),
                                   ('Input_FileDateStamp', np.float),
                                   ('Input_TreeByteOffset', np.int64),
                                   ('Input_TreeNbytes', np.int64), ])
            hf.create_dataset('TreeInfo', (0,), dtype=tree_dtype,
                              chunks=True, compression=compression,
                              maxshape=(None,))

            forests_grp = hf.create_group('Forests')
            if write_halo_props_cont:
                # Create a dataset for every halo property
                # For any given halo property, the value
                # for halos will be written contiguously
                # (structure of arrays)
                for name, dtype in output_dtype.descr:
                    forests_grp.create_dataset(name, (0,), dtype=dtype,
                                               chunks=chunks,
                                               compression=compression,
                                               maxshape=(None,))
            else:
                # Create a single dataset that contains all properties
                # of a given halo, then all properties of the next halo,
                # and so on (array of structures)
                forests_grp.create_dataset('halos', (0,),
                                           dtype=output_dtype,
                                           chunks=chunks,
                                           compression=compression,
                                           maxshape=(None,))

    # If ``truncate=True`` was specified, then the file has already
    # been created but contains 0 halos. At this point in the code,
    # we are appending to an existing hdf5 file. Therefore, two
    # conditions must be satisfed:
    #   i) file must exist
    #   ii) the file must have been created with the same value
    #       of ``write_halo_props_cont``
    # This utility checks for these two conditions - MS 19/3/2021
    check_for_contiguous_halos(output_file, write_halo_props_cont)

    halos_buffer = np.empty(max_nhalos_buffer, dtype=output_dtype)
    nhalos_in_buffer = 0
    with h5py.File(output_file, "a") as hf:
        # The filenames are written as byte-arrays (through np.string_)
        # into the hdf5 file. Therefore, we will need to convert back
        # into `str` objects
        existing_files = hf.attrs['input_files']
        target_all_files = np.unique(np.hstack((existing_files,
                                                alltreedatafiles)))

        # Strictly speaking this decode is not necessary but without
        # this extra decode we end up with an array that contains
        # both np.str_ and str -- MS 01/05/2020
        target_all_files = [x.decode() if isinstance(x, bytes)
                            else str(x) for x in target_all_files]

        if len(target_all_files) > len(existing_files):
            # Since we are appending to the hdf5 file, let's make
            # sure that *all* the files belong to the same setup of
            # simulation + mergertree. However, the ascii files
            # corresponding to the existing data might have been
            # deleted, so we should pass the metadata info directly
            # from the hdf5 file.
            base_input_catalog_type = hf.attrs['input_catalog_type'].decode()
            base_metadata = hf.attrs[f'{base_input_catalog_type}_metadata']
            base_version = hf.attrs[f'{base_input_catalog_type}_version'].decode()

            # Only validate the *current* files being processed
            assert len(alltreedatafiles) > 0
            validate_inputs_are_ctrees_files(alltreedatafiles,
                                             base_metadata=base_metadata,
                                             base_version=base_version,
                                             base_input_catalog_type=base_input_catalog_type)

        # We need to update how many *unique* input files have gone into
        # this hdf5 file
        hf.attrs['input_files'] = np.string_(target_all_files)
        hf.attrs['input_filedatestamp'] = np.array([os.path.getmtime(f)
                                                    for f in target_all_files])

        tree_dset = hf['TreeInfo']
        forest_dset = hf['ForestInfo']

        if forest_dset.shape[0] != hf.attrs['Nforests']:
            msg = "Error: The forest dataset does not contain *exactly* "\
                  "the same number of forests as specified in the file "\
                  f"attribute. Shape of forest dataset = "\
                  f"'{forest_dset.shape}', nforests in file attribute"\
                  f" = '{hf.attrs['Nforests']}'"
            raise AssertionError(msg)
        forest_offset = hf.attrs['Nforests']

        if tree_dset.shape[0] != hf.attrs['Ntrees']:
            msg = "Error: The tree dataset does not contain *exactly* "\
                  "the same number of trees as specified in the file "\
                  f"attribute. shape of tree dataset = '{tree_dset.shape}' "\
                  f"ntrees in file attribute = '{hf.attrs['Ntrees']}'"
            raise AssertionError(msg)
        tree_offset = hf.attrs['Ntrees']

        # resize both the datasets containing the forestlevel info and
        # treelevel info
        forest_dset.resize((forest_offset + nforests, ))
        tree_dset.resize((tree_offset + ntrees, ))

        # Now check the halos dataset
        halos_dset = _create_and_validate_halos_dset(hf, output_dtype,
                                                     write_halo_props_cont)

        # Okay - we have validated the halos offset
        halos_offset = hf.attrs['Nhalos']
        halos_dset_offset = halos_offset

        # resize the halos dataset so we don't have to resize at every step
        dset_size = halos_offset + approx_totnumhalos
        resize_halo_datasets(halos_dset, dset_size,
                             write_halo_props_cont, output_dtype)

        forest_dset[-nforests:, 'ForestID'] = forest_info['ForestID'][:]
        forest_dset[-nforests:, 'ForestNtrees'] = forest_info['Ntrees'][:]

        tree_dset[-ntrees:, 'ForestID', ] = trees_and_locations['ForestID'][:]
        tree_dset[-ntrees:, 'TreeRootID'] = trees_and_locations['TreeRootID'][:]

        # These quantities relate to the input files
        tree_dset[-ntrees:, 'Input_Filename'] = np.string_(trees_and_locations['Filename'][:])
        mtimes = [os.path.getmtime(fn) for fn in trees_and_locations['Filename']]
        tree_dset[-ntrees:, 'Input_FileDateStamp'] = np.array(mtimes)
        tree_dset[-ntrees:, 'Input_TreeByteOffset'] = trees_and_locations['Offset'][:]
        tree_dset[-ntrees:, 'Input_TreeNbytes'] = trees_and_locations['TreeNbytes'][:]

        alltreedatafiles = list(set(trees_and_locations['Filename']))
        if use_pread:
            filehandlers = {f: os.open(f, os.O_RDONLY)
                            for f in alltreedatafiles}
        else:
            filehandlers = {f: io.open(f, 'rt') for f in alltreedatafiles}

        ntrees_processed = 0
        treenhalos = np.empty(ntrees, dtype=np.int64)
        treehalos_offset = np.empty(ntrees, dtype=np.int64)
        forestnhalos = np.empty(nforests, dtype=np.int64)
        foresthalos_offset = np.empty(nforests, dtype=np.int64)
        for iforest in range(nforests):
            foresthalos_offset[iforest] = halos_offset
            forest_halos = np.empty(0, dtype=output_dtype)
            for _ in range(forest_info['Ntrees'][iforest]):
                treedata_file = trees_and_locations['Filename'][ntrees_processed]
                offset = trees_and_locations['Offset'][ntrees_processed]
                numbytes = trees_and_locations['TreeNbytes'][ntrees_processed]
                inp = filehandlers[treedata_file]

                if use_pread:
                    chunk = os.pread(inp, numbytes, offset)
                else:
                    inp.seek(offset, os.SEEK_SET)
                    chunk = inp.read(numbytes)

                parse_line = parser.parse_line
                halos = parser.pack([parse_line(line)
                                     for line in chunk.splitlines()])

                nhalos = halos.shape[0]
                forest_halos.resize(forest_halos.shape[0] + nhalos)

                # forest_halos have additional mergertree indices, therefore
                # the datatypes are not the same between halos (parser.dtype)
                # and forest_halos (output_dtype) -> assign by columns
                for name in parser.dtype.names:
                    forest_halos[name][-nhalos:] = halos[name][:]

                # Add the tree level info
                treenhalos[ntrees_processed] = nhalos
                treehalos_offset[ntrees_processed] = halos_offset
                ntrees_processed += 1
                if show_progressbar:
                    pbar.update(1)

                # Update the total number of halos read-in with
                # the number of halos in this tree
                halos_offset += nhalos

            # Entire forest has been loaded. Reset nhalos
            # to be the number of halos in the forest
            nhalos = forest_halos.shape[0]

            # Add the forest level info
            forestnhalos[iforest] = nhalos

            # Entire forest is now loaded -> add the mergertree indices
            add_tree_walk_indices(forest_halos, rank)

            # If there are not enough to trigger a write, simply fill up
            # the halos_buffer
            if (nhalos_in_buffer + nhalos) < max_nhalos_buffer:
                assert halos_buffer.dtype == forest_halos.dtype
                halos_buffer[nhalos_in_buffer:nhalos_in_buffer+nhalos] = forest_halos[:]
                nhalos_in_buffer += nhalos
                continue

            # Need to write to disk
            # Resize to make sure there is enough space to append the new halos
            if halos_offset > dset_size:
                resize_halo_datasets(halos_dset, halos_offset,
                                     write_halo_props_cont, output_dtype)
                dset_size = halos_offset

            # write the halos that are already in the buffer
            write_halos(halos_dset, halos_dset_offset, halos_buffer,
                        nhalos_in_buffer, write_halo_props_cont)
            halos_dset_offset += nhalos_in_buffer
            nhalos_in_buffer = 0

            # Now write the halos that have just been read-in
            # Note: The halos in the buffer *must* be written out before
            # the halos that have just been read-in. Otherwise, there will
            # sbe data corruption
            write_halos(halos_dset, halos_dset_offset, forest_halos,
                        nhalos, write_halo_props_cont)
            halos_dset_offset += nhalos
            if halos_offset != halos_dset_offset:
                msg = f"Error: After writing out halos into the hdf5 file, "\
                      f"expected to find that halos_offset = '{halos_offset}'"\
                      f" to be *exactly* equal to the offset in the hdf5 "\
                      f"dataset = '{halos_dset_offset}'"
                raise AssertionError(msg)

        # All the trees for this call have now been read in entirely -> Now
        # fix the actual dataset sizes to reflect the total number of
        # halos written
        resize_halo_datasets(halos_dset, halos_offset,
                             write_halo_props_cont, output_dtype)
        dset_size = halos_offset

        if nhalos_in_buffer > 0:
            write_halos(halos_dset, halos_dset_offset, halos_buffer,
                        nhalos_in_buffer, write_halo_props_cont)
            halos_dset_offset += nhalos_in_buffer
            nhalos_in_buffer = 0

        if halos_offset != halos_dset_offset:
            msg = f"Error: After writing *all* the halos into the hdf5 file, "\
                  f"expected to find that halos_offset = '{halos_offset}'"\
                  f" to be *exactly* equal to the offset in the hdf5 "\
                  f"dataset = '{halos_dset_offset}'"
            raise AssertionError(msg)

        msg = f"Error: Expected to process {ntrees} trees but processed "\
              f"{ntrees_processed} trees instead"
        assert ntrees_processed == ntrees, msg

        # all halos from all forests have been written out and the halo
        # dataset has been correctly resized. Now write the aggregate
        # quantities at the tree and forest levels
        tree_dset[-ntrees:, 'TreeNhalos'] = treenhalos[:]
        tree_dset[-ntrees:, 'TreeHalosOffset'] = treehalos_offset[:]
        forest_dset[-nforests:, 'ForestNhalos'] = forestnhalos[:]
        forest_dset[-nforests:, 'ForestHalosOffset'] = foresthalos_offset[:]

        hf.attrs['Nforests'] += nforests
        hf.attrs['Ntrees'] += ntrees
        hf.attrs['Nhalos'] = halos_offset
        if show_progressbar:
            pbar.close()

        # Close all the open file handlers
        if use_pread:
            for f in filehandlers.values():
                os.close(f)
        else:
            for f in filehandlers.values():
                f.close()

    totnumhalos = halos_offset

    t1 = time.perf_counter()
    print(f"[Rank {rank}]: processing {totnbytes} bytes "
          f"(in {ntrees} trees) spread over {nforests} forests...done. "
          f"Wrote {totnumhalos} halos in {t1-tstart:.2f} seconds")

    sys.stdout.flush()
    return True


[docs]def convert_ctrees_to_h5(filenames, standard_consistent_trees=None, outputdir="./", output_filebase="forest", write_halo_props_cont=True, fields=None, drop_fields=None, truncate=True, compression='gzip', buffersize=None, use_pread=True, max_nforests=None, comm=None, show_progressbar=False): """ Convert a set of forests from Consistent Trees ascii file(s) into an (optionally compressed) hdf5 file. Can be invoked with MPI. Parameters ----------- filenames: list of strings for Consistent-Trees catalogues, required The input ascii files will be decompressed, if required. standard_consistent_tree: boolean, optional, default: None Whether the input filres were generated by the Uchuu collaboration's parallel Consistent-Trees code. If only two files are specified in ``filenames``, and these two filenames end with 'forests.list', and 'locations.dat', then a standard Consistent-Trees output will be inferred. If all files specified in ``filenames`` end with '.tree', then parallel Consistent-Trees is inferred. outputdir: string, optional, default: current working directory ('./') The directory where the converted hdf5 file will be written in. The output filename is obtained by appending '.h5' to the ``input_file``. output_filebase: string, optional, default: "forest" The output filename is constructed using '{outputdir}/{output_filebase}_{rank}.h5' write_halo_props_cont: boolean, optional, default: True Controls if the individual halo properties are written as distinct datasets such that any given property for *all* halos is written contiguously (structure of arraysA). When set to False, only one dataset ('halos') is created under the group 'Forests', and *all* properties of a halo is written out contiguously (array of structures). fields: list of strings, optional, default: None Describes which specific columns in the input file to carry across to the hdf5 file. Default action is to convert ALL columns. drop_fields: list of strings, optional, default: None Contains a list of column names that will *not* be carried through to the hdf5 file. If ``drop_fields`` is not set for a parallel Consistent-Trees run, then [``Tidal_Force``, ``Tidal_ID``] will be used. ``drop_fields`` is processed after ``fields``, i.e., you can specify ``fields=None`` to create an initial list of *all* columns in the ascii file, and then specify ``drop_fields = [colname2, colname7, ...]``, and *only* those columns will not be present in the hdf5 output. truncate: boolean, default: True Controls whether a new file is created on this 'rank'. When set to ``True``, the header info file is written out. Otherwise, the file is appended to. The code checks to make sure that the existing metadata in the hdf5 file is identical to the new metadata in the ascii files being currently converted (i.e., tries to avoid different simulation + mergertree results being present in the same file) compression: string, optional, default: 'gzip' Controls the kind of compression applied. Valid options are anything that ``h5py`` accepts. buffersize: integer, optional, default: 1 MB Controls the size of the buffer how many halos are written out per write call to the hdf5 file. The number of halos written out is this buffersize divided the size of the datatype for individual halos. use_pread: boolean, optional, default: True Controls whether low-level i/o operations (through ``os.pread``) is used. Otherwise, higher-level i/o operations (via ``io.open``) is used. This option is only meaningful on linux systems (and python3+). Since ``pread`` does not change the file offset, additional parallelisation can be implemented reasonably easily. max_nforests: integer >= 1, optional, default: None The maximum number of forests to convert across all tasks. If a positive value is passed then the total number of forests converted will be ``min(totnforests, max_nforests)``. ValueError is raised if the passed parameter value is less than 1. comm: MPI communicator, optional, default: None Controls whether the conversion is run in MPI parallel. Should be compatible with `mpi4py.MPI.COMM_WORLD`. show_progressbar: boolean, optional, default: False Controls whether a progressbar is printed. Only enables progressbar on rank==0, the remaining ranks ignore this keyword. Returns ------- Returns ``True`` on successful completion. """ import os import sys import time import numpy as np rank = 0 ntasks = 1 if comm: rank = comm.Get_rank() ntasks = comm.Get_size() if not os.path.isdir(outputdir): msg = f"Error: Output directory = {outputdir} is not a valid directory" raise ValueError(msg) if max_nforests and max_nforests <= 0: msg = f"Error: The maximum number of forests to convert "\ f"= {max_nforests} must be >= 1" raise ValueError(msg) tstart = time.perf_counter() sys.stdout.flush() if not standard_consistent_trees: standard_consistent_trees = True # The Uchuu collaboration has a special parallel version of the # Consistent-Tree developed by @Tomo. This code generates a set of # files equivalent to (forests.list, locations.dat, tree_*.dat) file # per CPU task. In that code, forests are guaranteed to be located # completely within one tree data file. However, the public version # of Consistent-Trees is different and there is exactly one # 'forests.list' and 'locations.dat' files for the entire simulation, # and as many tree_*.dat files as the number of BOX_DIVISIONS^3 set # in the Consistent-Trees config file at runtime. # This script aims to handle both scenarios with this logic -- if only # the "forests.list" and "locations.dat" files are supplied as # command-line arguments, then the public version of the # Consistent-Trees catalog is assumed. Otherwise, a list of # *all* the "XXXXXX.tree" files should be passed (i.e., for people # in the Uchuu collaboration). The correspoonding 'forest' and # 'location' file names will be automatically generated by assuming # the Uchuu convention: # -> tree data files are called '<prefix>.tree' # -> associated forest.list file is called '<prefix>.forest' # -> associated locations.dat file is called '<prefix>.loc' # # If all files supplied at the command-line endwith # '.tree(.bz2,.zip,.gz)' then it is a parallel Consistent-Trees run. check_pctrees_files = [True if 'tree' in set(f.split('.')) else False for f in filenames] if np.all(check_pctrees_files): standard_consistent_trees = False if standard_consistent_trees: if len(filenames) != 2: msg = "Error: To convert a standard Consistent-Trees output, "\ "please specify *exactly* two files -- the 'forests.list' and "\ "the 'locations.dat' files (order is unimportant). "\ f"Instead found filenames = '{filenames}'" raise ValueError(msg) filebasenames = set([os.path.basename(f) for f in filenames]) expected_filebasenames = set(['forests.list', 'locations.dat']) if filebasenames != expected_filebasenames: msg = "Error: To convert a standard Consistent-Trees output, "\ "please specify *exactly* two files -- the "\ "'forests.list' and the 'locations.dat' files "\ "(order is unimportant). While exactly two files were "\ "specified, at least one of the 'forests.list' or "\ "'locations.dat' files were not present in the "\ f"supplied filenames = '{filenames}'" raise ValueError(msg) if standard_consistent_trees: forests_file, locations_file = check_forests_locations_filenames(filenames) forests_and_locations_fnames = [(forests_file, locations_file)] else: # We are converting parallel Ctrees files; however, these files might # still be compressed and we need to decompress them before processing. forests_and_locations_fnames = [] decompressed_filenames = [] for fname in filenames: decomp_fname = check_and_decompress(fname) extname = '.tree' if extname not in decomp_fname: msg = "Error: Should pass the tree data file names (i.e., "\ f"the filenames should end in '{extname}'. "\ f"Instead got filename = '{decomp_fname}'" raise ValueError(msg) decompressed_filenames.append(decomp_fname) forests_file, locations_file, _ = get_all_parallel_ctrees_filenames(decomp_fname) forests_and_locations_fnames.append((forests_file, locations_file)) # Since multiple tree files are specified at the command-line, # let's make sure that all the files belong to the # same simulation + mergertree setup assert len(decompressed_filenames) > 0 validate_inputs_are_ctrees_files(decompressed_filenames) if (not drop_fields) and (not standard_consistent_trees): # The Tidal fields can not be correctly calculated in the # parallel CTrees code and are dropped from the hdf5 file drop_fields = ['Tidal_Force', 'Tidal_ID'] nfiles = len(forests_and_locations_fnames) if rank == 0: print(f"[Rank={rank}]: Converting {nfiles} sets of (forests, " f"locations) files over {ntasks} tasks ... ") nconverted = 0 for (forests_file, locations_file) in forests_and_locations_fnames: t0 = time.perf_counter() print(f"[Rank={rank}]: Reading forests and locations files...") trees_and_locations = read_locations_and_forests(forests_file, locations_file, rank) ntrees = trees_and_locations.shape[0] t1 = time.perf_counter() print(f"[Rank={rank}]: Reading forests and locations files...done. " f"Time taken = {t1-t0:.2f} seconds") alltreedatafiles = list(set(trees_and_locations['Filename'])) if standard_consistent_trees: # Validate that all the files are the same version and all # contain a Consistent Trees catalog assert len(alltreedatafiles) > 0 validate_inputs_are_ctrees_files(alltreedatafiles) else: # Since we are processing parallel CTrees output, *every* tree data # file has an associated forests and locations file. Which means # that every locations file should only have one treedata file # Check that that is the case if len(alltreedatafiles) != 1: msg = "Error: Expected to find *exactly* one tree data file "\ "per locations file. However, while processing the "\ f"locations_file = '{locations_file}', found "\ f"{len(alltreedatafiles)} tree data files. "\ f"The unique tree data files are: {alltreedatafiles}" raise AssertionError(msg) # No need to validate that the input files are valid CTrees files # That has already been done if rank == 0: print(f"[Rank={rank}]: Converting a single set of (forests, " f"locations) over {ntasks} tasks...") # We have the tree-level info, let's create a similar info for # the forests (by grouping trees by ForestID) forest_info = get_aggregate_forest_info(trees_and_locations, rank) # If `max_nforests` was passed, then we convert at most # the first `max_nforests` -- MS 27/07/2020 if max_nforests: nforests = min(max_nforests, forest_info.shape[0]) forest_info = forest_info[0:nforests] # Distribute the forests over ntasks (forest_start, forest_stop) = distribute_array_over_ntasks(forest_info['Input_ForestNbytes'], rank, ntasks) forest_ntrees_offset = forest_info['Ntrees'][:].cumsum() tree_start = 0 if forest_start >= 1: tree_start = forest_ntrees_offset[forest_start - 1] # tree_stop is not-inclusive, and can be directly used in a # slicing context tree_stop = forest_ntrees_offset[forest_stop] print(f"[Rank={rank}]: Processing trees [tree_start, tree_stop) = " f"[{tree_start}, {tree_stop}) totntrees = {ntrees}") # Now we can start converting the forests # *Note* ``forest_stop`` is inclusive but since we are slicing, # we need to account for the python convention hence, the slice # on ``forest_info`` goes up to ``forest_stop + 1`` _convert_ctrees_forest_range(forest_info[forest_start:forest_stop+1], trees_and_locations[tree_start:tree_stop], rank, outputdir=outputdir, output_filebase=output_filebase, write_halo_props_cont=write_halo_props_cont, fields=fields, drop_fields=drop_fields, truncate=truncate, compression=compression, buffersize=buffersize, use_pread=use_pread, show_progressbar=show_progressbar) truncate = False nconverted += 1 if rank == 0: print(f"[Rank={rank}]: Converting a single set of (forests, " f"locations) over {ntasks} tasks...done. Converted " f"{nconverted} out of {nfiles} sets of files") # Done converting all files. The barrier is necessary to # ensure that the container file is created *after* all the # data have been converted if comm: comm.Barrier() if rank != 0: return True # Create the main file that contains the other files as (hdf5-sym-)links fname = f'{outputdir}/{output_filebase}.h5' outfiles = [f'{outputdir}/{output_filebase}_{itask}.h5' for itask in range(ntasks)] update_container_h5_file(fname, outfiles, standard_consistent_trees) t1 = time.perf_counter() print(f"Converting {nfiles} sets of (forests, locations) files " f"over {ntasks} tasks ...done. Time taken = {t1-tstart:.2f} seconds") return True