# -*- coding: utf-8 -*-
"""
General Utilities
==================
Provides several utility functions.
"""
__author__ = "Manodeep Sinha"
__all__ = ["get_parser", "get_approx_totnumhalos", "generic_reader",
"get_metadata", "resize_halo_datasets",
"check_and_decompress", "distribute_array_over_ntasks",
"check_for_contiguous_halos",
"write_halos", "update_container_h5_file", ]
# Majority of the code below is taken from
# Yao-Yuan Mao's SimulationAnalysis.py (part of YYM's helper package)
import re
import numpy as np
from builtins import range, zip
from contextlib import contextmanager
def sanitize_ctrees_header(headerline):
import re
header = [re.sub('\(\d+\)$', '', s) for s in headerline]
# print("After normal sub: header = {}\n".format(header))
header = [re.sub('[^a-zA-Z0-9 \n\.]', '_', s) for s in header]
# print(f"After replacing special characters with _: header = {header}\n")
header = [re.sub('_$', '', s) for s in header]
# print(f"After replacing trailing underscore: header = {header}\n")
header = [re.sub('(_)+', '_', s) for s in header]
# print(f"After replacing multiple underscores: header = {header}")
return header
def _isstring(s):
try:
s + ''
except TypeError:
return False
return True
class BaseParseFields():
def __init__(self, header, fields=None):
if len(header) == 0:
if all(isinstance(f, int) for f in fields):
self._usecols = fields
self._formats = [float]*len(fields)
self._names = ['f%d' % f for f in fields]
else:
raise ValueError("header is empty, so fields must be "
"a list of int.")
else:
# print(f"in baseparsefields: header = {header}")
header_s = [self._name_strip(__) for __ in header]
# print(f"header_s = {header_s}")
if not fields:
self._names = header
names_s = header_s
self._usecols = list(range(len(names_s)))
else:
# print(f"fields = {fields}")
if _isstring(fields):
fields = [fields]
self._names = [header[f] if isinstance(f, int)
else str(f) for f in fields]
names_s = [self._name_strip(__) for __ in self._names]
wrong_fields = [str(f) for s, f in zip(names_s, fields)
if s not in header_s]
if wrong_fields:
msg = f"The following field(s) are not available "\
f"{', '.join(wrong_fields)}.\nAvailable fields: "\
f"{', '.join(header)}"
raise ValueError(msg)
self._usecols = [header_s.index(__) for __ in names_s]
self._formats = [self._get_format(__) for __ in names_s]
self.dtype = np.dtype({'names': self._names,
'formats': self._formats})
def parse_line(self, line):
items = line.split()
try:
return tuple(c(items[i]) for i, c in zip(self._usecols,
self._formats))
except Exception as _error:
msg = f'Something wrong when parsing this line:\n{line}'
raise _error(msg)
def pack(self, X):
return np.array(X, self.dtype)
def _name_strip(self, s):
return self._re_name_strip.sub('', s).lower()
def _get_format(self, s):
return float if self._re_formats.search(s) is None else int
_re_name_strip = re.compile('\W|_')
_re_formats = re.compile('^phantom$|^mmp$|id$|^num|num$')
[docs]def get_parser(filename, fields=None, drop_fields=None):
"""
Returns a parser that parses a single line from the input ascii file
Parameters
----------
filename: string, required
A filename containg Rockstar/Consistent-Trees data. Can be a compressed
file if the compression is one of the types supported by the
``generic_reader`` function.
fields: list of strings, optional, default: None
Describes which specific columns in the input file to carry across
to the hdf5 file. Default action is to convert ALL columns.
drop_fields: list of strings, optional, default: None
Describes which columns are not carried through to the hdf5 file.
Processed after ``fields``, i.e., you can specify ``fields=None`` to
create an initial list of *all* columns in the ascii file, and then
specify ``drop_fields = [colname2, colname7, ...]``, and those columns
will not be present in the hdf5 output.
Returns
-------
parser: an instance of BaseParseFields
A parser that will parse a single line (read from a
Rockstar/Consistent-Trees file) and create a tuple
containing *only* the relevant columns.
"""
with generic_reader(filename, 'rt') as f:
# Taken from SimulationAnalysis.py
hdrline = next(f)
hdrline = hdrline.strip().lstrip('#').lstrip()
# print(f"hdrline = {hdrline}")
split_headerline = [s for s in hdrline.split()]
header = sanitize_ctrees_header(split_headerline)
# print(f"sanitised header = {header}")
keep_fields = header[:]
if fields:
print(f"Only keeping the specified Fields = {fields}")
fields = [s.upper() for s in fields]
for fld in split_headerline:
upfld = fld.upper()
if upfld not in fields:
keep_fields.remove(fld)
print(f"Removing fld = {fld} since it does not exist "
"in 'fields'")
else:
print(f"Keeping fld = {fld}")
if drop_fields:
for fld in drop_fields:
try:
keep_fields.remove(fld)
except ValueError:
msg = f"Field = '{fld}' was specified in the list of "\
f"fields to be dropped but could not find in the "\
f"list of current fields = {keep_fields}. Perhaps "\
f"the case needs to be fixed (the matching is "\
f"case-sensitive)?"
raise ValueError(msg)
p = BaseParseFields(header, keep_fields)
# Above taken from SimulationAnalysis.py
return p
[docs]@contextmanager
def generic_reader(filename, mode='rt'):
"""
Returns a file-reader with capability to read line-by-line
for both compressed and normal text files.
Parameters
-----------
filename: string, required
The filename for the associated input/output. Can be a
compressed (.bz2, .gz, .xz, .zip) file as well as a regular
ascii file
mode: string, optional, default: 'rt' (readonly-text mode)
Controls the kind of i/o operation that will be
performed
Returns
--------
f: file handle, generator
Yields a generator that has the ``readline`` feature (i.e.,
supports the paradigm ``for line in f:``). This file-reader
generator is suitable for use in ``with`` statements, e.g.,
``with generic_reader(<fname>) as f:``
"""
if not isinstance(filename, str) and hasattr(filename, 'decode'):
filename = filename.decode()
if filename.endswith(".bz2"):
import bz2
f = bz2.open(filename, mode=mode)
elif filename.endswith(".gz"):
import gzip
f = gzip.open(filename, mode=mode)
elif filename.endswith('.xz'):
import lzma
f = lzma.open(filename, mode=mode)
elif filename.endswith('.zip'):
validmodes = set(['r', 'w'])
if mode not in validmodes:
msg = f"Error: filename = '{filename}' is a zipfile but the "\
f"requested mode = '{mode}' is not the valid zipfile "\
f"modes = '{validmodes}'"
raise ValueError(msg)
import zipfile
f = zipfile.open(filename, mode)
else:
import io
f = io.open(filename, mode=mode)
yield f
f.close()
def get_simulation_params_from_metadata(metadata):
simulation_params = dict()
for line in metadata[1:]:
# cosmological parameters
if "Omega_M" in line or ("Om" in line and "Ol" in line):
pars = line[1:].split(";")
for j, par in enumerate(["Omega_M",
"Omega_L",
"hubble"]):
v = float(pars[j].split(" = ")[1])
simulation_params[par] = v
# box size
elif "Full box size" in line or "Box size" in line:
pars = line.split("=")[1].strip().split()
box = float(pars[0])
simulation_params['Boxsize'] = box
# We can break because boxsize always occurs later than
# the cosmological parameters (regardless of
# CTrees/Rockstar origin)
break
return simulation_params
[docs]def get_approx_totnumhalos(input_file, ndatabytes=None):
"""
Returns an (approximate) number of lines containing data
in the ``input_file``.
Assumes that the only comment lines in the file occur
at the beginning. Comment lines are assumed to begin with '#'.
Parameters
-----------
input_file: string, required
The input filename for the Rockstar/Consistent Trees file
ndatabytes: integer, optional
The total number of bytes being processed. If not passed, the
entire disk size of the ``input_file`` minus the initial header
lines will be used (i.e. assumes that the entire file is being
processed)
Returns
--------
approx_totnumhalos: integer
The approximate number of halos in the input file. The actual
number of halos should be close but can be smaller/greater than
the approximate value.
"""
import os
with generic_reader(input_file, 'rt') as f:
hdr_bytes = 0
for line in f:
if line.startswith('#'):
hdr_bytes += len(line)
else:
# The first line of a CTrees file is the number of trees
# in that file. We should skip that line
if len(line.split()) == 1:
hdr_bytes += len(line)
continue
data_line_len = len(line)
break
if not ndatabytes:
statinfo = os.stat(input_file)
totbytes = statinfo.st_size
ndatabytes = totbytes - hdr_bytes
approx_nhalos = (ndatabytes // data_line_len) + 1
return approx_nhalos
[docs]def check_and_decompress(fname):
"""
Decompresses the input file (if necessary) and returns the
decompressed filename
Parameters
----------
fname: string, required
Input filename, can be compressed
Returns
-------
decomp_fname: string
The decompressed filename
"""
decompressors = [('.bz2', 'bunzip2'),
('.gz', 'gunzip'),
('.zip', 'unzip')]
if not isinstance(fname, str) and hasattr(fname, 'decode'):
fname = fname.decode()
for (ext, decomp) in decompressors:
if fname.endswith(ext):
import subprocess
print(f"Uncompressing compressed file '{fname}'...")
subprocess.run([decomp, fname], check=True)
print(f"Uncompressing compressed file '{fname}'...done")
fname = fname.rstrip(ext)
break
return fname
[docs]def distribute_array_over_ntasks(cost_array, rank, ntasks):
"""
Calculates the subscript range for the ``rank``'th task
such that the work-load is evenly distributed across
``ntasks``.
Parameters
-----------
cost_array: numpy array, required
Contains the cost associated with processing *each* element
of the array
rank: integer, required
The integer rank for the task that we need to compute the
work-load division for
ntasks: integer, required
Total number of tasks that the array should be (evenly) distributed
across
Returns
--------
(start, stop): A tuple of (np.int64, np.int64)
Contains the initial and final subscripts that the ``rank``
task should process.
Note: start, stop are both inclusive, i.e., all elements from ``start``
to ``stop`` should be included. For python array indexing with slices,
this translates to arr[start:stop + 1].
"""
if rank >= ntasks or rank < 0 or ntasks < 1:
msg = f"Error: rank = {rank} and NTasks = {ntasks} must satisfy "\
" i) ThisTask < NTasks, "\
" ii) ThisTask > 0, and "\
" iii) NTasks >= 1"
raise ValueError(msg)
ncosts = cost_array.shape[0]
if ncosts < 0:
msg = f"Error: On rank = {rank} total number of elements = {ncosts} "\
"must be >= 0"
raise ValueError(msg)
if ncosts == 0:
print(f"Warning: On rank = {rank}, got 0 elements to distribute "
f"...returning")
start, stop = None, None
return (start, stop)
if ntasks == 1:
start, stop = 0, ncosts - 1
print(f"Only a single task -- distributing the entire array "
f"[start, stop] = [{start}, {stop}]")
return (start, stop)
cumul_cost_array = cost_array.cumsum()
total_cost = cumul_cost_array[-1]
target_cost = total_cost / ntasks
cost_remaining, cost_assigned, ntasks_left = total_cost, 0.0, ntasks
start = 0
for icore in range(rank + 1):
full_array_target_cost = cost_assigned + target_cost
if icore == (ntasks - 1):
stop = ncosts - 1
else:
stop = min(np.where(cumul_cost_array >= full_array_target_cost)[0])
if stop < ncosts:
cost_assigned_this_rank = cumul_cost_array[stop] - cost_assigned
# Only print the work divisions on the last rank
if rank == (ntasks - 1):
# Yes, this is intentional that `icore` is used. The intent is
# to show which forests ranges are assigned to which task
# (i.e., rank) - MS 22/06/2020
print(f"[Rank={icore}]: Assigning forests: start, stop = "
f"[{start}, {stop}]. Target cost = {target_cost}, "
f"cost actually assigned = {cost_assigned_this_rank}")
if icore == rank:
break
cost_remaining -= cost_assigned_this_rank
cost_assigned += cost_assigned_this_rank
ntasks_left -= 1
target_cost = cost_remaining / ntasks_left
start = stop + 1
else:
msg = "Error: While determining optimal distribution of forests "\
"across MPI tasks. Could not locate an appropriate "\
f"stopping point for rank = {rank}. Expected stop = {stop} "\
"to be less than the number of elements in the cost "\
f"array = {ncosts}"
raise ValueError(msg)
return (start, stop)
[docs]def check_for_contiguous_halos(h5_task_file, write_halo_props_cont):
"""
Checks that the hdf5 file can be appended to with the requested writing
of halo properties
Parameters
-----------
h5_task_file: string, required
An existing hdf5 file. The file may or may not contain halos, but
the dataset (or datasets, depending on the value of
``write_halo_props_cont``) for the halo properties should already
be created
write_halo_props_cont: boolean, required
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
Returns
-------
Returns ``True`` on successful completion
"""
import os
import h5py
if not os.path.isfile(h5_task_file):
msg = "Error: You have specified to append to the hdf5 file but "\
f"the file ={h5_task_file} does *not* exist. You can fix this "\
"error by passing ``truncate=True`` (but that may truncate "\
"other hdf5 files that you have previously created)."
raise ValueError(msg)
with h5py.File(h5_task_file, 'r') as hf:
if hf.attrs['contiguous-halo-props'] != write_halo_props_cont:
msg = "Error: Found incompatible option for writing contiguous "\
f"halo properties when appending to the hdf5 file = {h5_task_file}. "\
"The hdf5 file was created with "\
f"``write_halo_props_cont={hf.attrs['contiguous-halo-props']}`` "\
f"but the current request is with ``write_halo_props_cont={write_halo_props_cont}``.\n"\
f"You can fix this either by setting "\
f"``write_halo_props_cont={hf.attrs['contiguous-halo-props']}``"\
f"or by setting ``truncate=True`` (but that will "\
"over-write the existing file)."
raise ValueError(msg)
return True
[docs]def resize_halo_datasets(halos_dset, new_size, write_halo_props_cont, dtype):
"""
Resizes the halo datasets
Parameters
-----------
halos_dset: dictionary, required
new_size: scalar integer, required
write_halo_props_cont: boolean, required
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
dtype: numpy datatype
Returns
-------
Returns ``True`` on successful completion
"""
if write_halo_props_cont:
for name in dtype.names:
dset = halos_dset[name]
dset.resize((new_size, ))
else:
halos_dset.resize((new_size, ))
return True
[docs]def write_halos(halos_dset, halos_dset_offset, halos, nhalos_to_write,
write_halo_props_cont):
"""
Writes halos into the relevant dataset(s) within a hdf5 file
Parameters
-----------
halos_dset: dictionary, required
Contains the halos dataset(s) within a hdf5 file where either
the entire halos array or the individual halo properties should
be written to. See parameter ``write_halo_props_cont`` for
further details
halos_dset_offset: scalar integer, required
Contains the index within the halos dataset(s) where the write
should start
halos: numpy structured array, required
An array containing the halo properties that should be written out
into the hdf5 file. The entire array may not be written out, see
the parameter ``nhalos_to_write``
nhalos_to_write: scalar integer, required
Number of halos from the ``halos`` array that should be written
out. Can be smaller than the shape of the ``halos`` array
write_halo_props_cont: boolean, required
Controls if the individual halo properties are written as distinct
datasets such that any given property for ALL halos is written
contiguously (structure of arrays, SOA).
Returns
-------
Returns ``True`` on successful completion of the write
"""
if write_halo_props_cont:
for name in halos.dtype.names:
dset = halos_dset[name]
dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] \
= halos[name][0:nhalos_to_write]
else:
halos_dset[halos_dset_offset:halos_dset_offset + nhalos_to_write] = \
halos[0:nhalos_to_write]
return True
[docs]def update_container_h5_file(fname, h5files,
standard_consistent_trees=True):
"""
Writes the container hdf5 file that has external links to
the hdf5 datafiles with the mergertree information.
Parameters
-----------
fname: string, required
The name of the output container file (usually ``forest.h5``). A
new file is always created, however, if the file ``fname`` previously
existed then the external links are preserved.
h5files: list of filenames, required
The list of filenames that were either newly created or updated.
If the container file ``fname`` exists, then the union of the filenames
that already existed in ``fname`` and ``h5files`` will be used to
create the external links
standard_consistent_tree: boolean, optional, default: True
Specifies whether the input files were from a parallel
Consistent-Trees code or the standard Consistent-Trees code. Assumed
to be standard (i.e., the public version) of the Consistent-Trees
catalog
Returns
-------
Returns ``True`` on successful completion of the write
"""
import h5py
outfiles = h5files
if not isinstance(h5files, (list, tuple)):
outfiles = [h5files]
try:
with h5py.File(fname, 'r') as hf:
nfiles = hf['/'].attrs['Nfiles']
for ifile in range(nfiles):
outfiles.append(hf[f'File{ifile}'].file)
except OSError:
pass
outfiles = set(outfiles)
nfiles = len(outfiles)
with h5py.File(fname, 'w') as hf:
hf['/'].attrs['Nfiles'] = nfiles
hf['/'].attrs['TotNforests'] = 0
hf['/'].attrs['TotNtrees'] = 0
hf['/'].attrs['TotNhalos'] = 0
attr_props = [('TotNforests', 'Nforests'),
('TotNtrees', 'Ntrees'),
('TotNhalos', 'Nhalos')]
for ifile, outfile in enumerate(outfiles):
with h5py.File(outfile, 'a') as hf_task:
if standard_consistent_trees:
hf_task.attrs['consistent-trees-type'] = 'standard'
else:
hf_task.attrs['consistent-trees-type'] = 'parallel'
for (out, inp) in attr_props:
hf['/'].attrs[out] += hf_task['/'].attrs[inp]
hf[f'File{ifile}'] = h5py.ExternalLink(outfile, '/')
return