Source code for damask.util

"""Miscellaneous helper functionality."""

import sys as _sys
import datetime as _datetime
import os as _os
import subprocess as _subprocess
import shlex as _shlex
import re as _re
import signal as _signal
import fractions as _fractions
import contextlib as _contextlib
from collections import abc as _abc, OrderedDict as _OrderedDict
from functools import reduce as _reduce, partial as _partial, wraps as _wraps
import inspect
from typing import Optional as _Optional, Callable as _Callable, Union as _Union, Iterable as _Iterable, \
                   Dict as _Dict, List as _List, Tuple as _Tuple, Literal as _Literal, \
                   Any as _Any, TextIO as _TextIO, Generator as _Generator
from pathlib import Path as _Path

import numpy as _np
import h5py as _h5py

from . import version as _version
from ._typehints import FloatSequence as _FloatSequence, IntSequence as _IntSequence, \
                        NumpyRngSeed as _NumpyRngSeed, FileHandle as _FileHandle


# https://svn.blender.org/svnroot/bf-blender/trunk/blender/build_files/scons/tools/bcolors.py
# https://stackoverflow.com/questions/287871
_colors = {
           'header' :   '\033[95m',
           'OK_blue':   '\033[94m',
           'OK_green':  '\033[92m',
           'warning':   '\033[93m',
           'fail':      '\033[91m',
           'end_color': '\033[0m',
           'bold':      '\033[1m',
           'dim':       '\033[2m',
           'underline': '\033[4m',
           'crossout':  '\033[9m'
          }

####################################################################################################
# Functions
####################################################################################################

def _broadcast_shapes(a1,a2):
    """
    Backport from NumPy for versions < 1.20.

    References
    ----------
    https://github.com/numpy/numpy/blob/2f7fe64b8b6d7591dd208942f1cc74473d5db4cb/numpy/lib/_stride_tricks_impl.py#L427

    """
    return _np.broadcast(*[_np.empty(x, dtype=_np.dtype([])) for x in [a1,a2]]).shape


if _np.lib.NumpyVersion(_np.__version__) < '2.20.0':
    _np.broadcast_shapes = _broadcast_shapes # type: ignore

[docs] def srepr(msg, glue: str = '\n', quote: bool = False) -> str: r""" Join (quoted) items with glue string. Parameters ---------- msg : (sequence of) object with __repr__ Items to join. glue : str, optional Glue used for joining operation. Defaults to '\n'. quote : bool, optional Quote items. Defaults to False. Returns ------- joined : str String representation of the joined and quoted items. """ q = '"' if quote else '' if (not hasattr(msg, 'strip') and (hasattr(msg, '__getitem__') or hasattr(msg, '__iter__'))): return glue.join(q+str(x)+q for x in msg) else: return q+(msg if isinstance(msg,str) else repr(msg))+q
[docs] def emph(msg) -> str: """ Format with emphasis. Parameters ---------- msg : (sequence of) object with __repr__ Message to format. Returns ------- formatted : str Formatted string representation of the joined items. """ return _colors['bold']+srepr(msg)+_colors['end_color']
[docs] def deemph(msg) -> str: """ Format with deemphasis. Parameters ---------- msg : (sequence of) object with __repr__ Message to format. Returns ------- formatted : str Formatted string representation of the joined items. """ return _colors['dim']+srepr(msg)+_colors['end_color']
[docs] def warn(msg) -> str: """ Format for warning. Parameters ---------- msg : (sequence of) object with __repr__ Message to format. Returns ------- formatted : str Formatted string representation of the joined items. """ return _colors['warning']+emph(msg)+_colors['end_color']
[docs] def strikeout(msg) -> str: """ Format as strikeout. Parameters ---------- msg : (iterable of) object with __repr__ Message to format. Returns ------- formatted : str Formatted string representation of the joined items. """ return _colors['crossout']+srepr(msg)+_colors['end_color']
[docs] def run(cmd: str, wd: str = './', env: _Optional[_Dict[str, str]] = None, timeout: _Optional[int] = None) -> _Tuple[str, str]: """ Run a command. Parameters ---------- cmd : str Command to be executed. wd : str, optional Working directory of process. Defaults to './'. env : dict, optional Environment for execution. timeout : integer, optional Timeout in seconds. Returns ------- stdout, stderr : (str, str) Output of the executed command. """ def pass_signal(sig,_,proc,default): proc.send_signal(sig) _signal.signal(sig,default) _signal.raise_signal(sig) signals = [_signal.SIGINT,_signal.SIGTERM] print(f"running '{cmd}' in '{wd}'") process = _subprocess.Popen(_shlex.split(cmd), stdout = _subprocess.PIPE, stderr = _subprocess.PIPE, env = _os.environ if env is None else env, cwd = wd, encoding = 'utf-8') # ensure that process is terminated (https://stackoverflow.com/questions/22916783) sig_states = [_signal.signal(sig,_partial(pass_signal,proc=process,default=_signal.getsignal(sig))) for sig in signals] try: stdout,stderr = process.communicate(timeout=timeout) finally: for sig,state in zip(signals,sig_states): _signal.signal(sig,state) if process.returncode != 0: print(stdout) print(stderr) raise RuntimeError(f"'{cmd}' failed with returncode {process.returncode}") return stdout, stderr
[docs] @_contextlib.contextmanager def open_text(fname: _FileHandle, mode: _Literal['r','w'] = 'r') -> _Generator[_TextIO, None, None]: # noqa """ Open a text file with Unix line endings If a path or string is given, a context manager ensures that the file handle is closed. If a file handle is given, it remains unmodified. Parameters ---------- fname : file, str, or pathlib.Path Name or handle of file. mode: {'r','w'}, optional Access mode: 'r'ead or 'w'rite, defaults to 'r'. Returns ------- f : file handle """ if isinstance(fname, (str,_Path)): fhandle = open(_Path(fname).expanduser(),mode,newline=('\n' if mode == 'w' else None)) yield fhandle fhandle.close() else: yield fname
[docs] def time_stamp() -> str: """Provide current time as formatted string.""" return _datetime.datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S%z')
[docs] def execution_stamp(class_name: str, function_name: _Optional[str] = None) -> str: """Timestamp the execution of a (function within a) class.""" _function_name = '' if function_name is None else f'.{function_name}' return f'damask.{class_name}{_function_name} v{_version} ({time_stamp()})'
[docs] def natural_sort(key: str) -> _List[_Union[int, str]]: """ Natural sort. For use in python's 'sorted'. References ---------- https://en.wikipedia.org/wiki/Natural_sort_order """ convert = lambda text: int(text) if text.isdigit() else text return [ convert(c) for c in _re.split('([0-9]+)', key) ]
[docs] def show_progress(iterable: _Iterable, N_iter: _Optional[int] = None, prefix: str = '', bar_length: int = 50) -> _Any: """ Decorate a loop with a progress bar. Use similar like enumerate. Parameters ---------- iterable : iterable Iterable to be decorated. N_iter : int, optional Total number of iterations. Required if iterable is not a sequence. prefix : str, optional Prefix string. Defaults to ''. bar_length : int, optional Length of progress bar in characters. Defaults to 50. """ if isinstance(iterable,_abc.Sequence): if N_iter is None: N = len(iterable) else: raise ValueError('N_iter given for sequence') else: if N_iter is None: raise ValueError('N_iter not given') N = N_iter if N <= 1: for item in iterable: yield item else: status = ProgressBar(N,prefix,bar_length) for i,item in enumerate(iterable): yield item status.update(i)
[docs] def scale_to_coprime(v: _FloatSequence, N_significant: int = 9) -> _np.ndarray: """ Scale vector to co-prime (relatively prime) integers. Parameters ---------- v : sequence of float, len (:) Vector to scale. N_significant: int, optional Number of significant digits to consider. Defaults to 9. Returns ------- m : numpy.ndarray, shape (:) Vector scaled to co-prime numbers. """ def get_square_denominator(x,max_denominator): """Denominator of the square of a number.""" return _fractions.Fraction(x ** 2).limit_denominator(max_denominator).denominator def lcm(a,b): """Least common multiple.""" try: return _np.abs(_np.lcm(a,b)) # numpy > 1.18 except AttributeError: return _np.abs(a * b // _np.gcd(a, b)) max_denominator = int(10**(N_significant-1)) v_ = _np.asarray(v) if _np.issubdtype(v_.dtype,_np.inexact): v_ = _np.round(_np.asarray(v,_np.float64)/_np.max(_np.abs(v)),N_significant) m = (v_ * _reduce(lcm, map(lambda x: int(get_square_denominator(x,max_denominator)),v_))**0.5).astype(_np.int64) m = m//_reduce(_np.gcd,m) if not _np.allclose(m/_np.max(_np.abs(m)),v/_np.max(_np.abs(v)),atol=1e-2,rtol=0): raise ValueError(f'invalid result "{m}" for input "{v}"') return m
[docs] def project_equal_angle(vector: _np.ndarray, direction: _Literal['x', 'y', 'z'] = 'z', # noqa normalize: bool = True, keepdims: bool = False) -> _np.ndarray: """ Apply equal-angle projection to vector. Parameters ---------- vector : numpy.ndarray, shape (...,3) Vector coordinates to be projected. direction : {'x', 'y', 'z'} Projection direction. Defaults to 'z'. normalize : bool Ensure unit length of input vector. Defaults to True. keepdims : bool Maintain three-dimensional output coordinates. Defaults to False. Returns ------- coordinates : numpy.ndarray, shape (...,2 | 3) Projected coordinates. Notes ----- Two-dimensional output uses right-handed frame spanned by the next and next-next axis relative to the projection direction, e.g. x-y when projecting along z and z-x when projecting along y. Examples -------- >>> import damask >>> import numpy as np >>> project_equal_angle(np.ones(3)) array([0.3660, 0.3660]) >>> project_equal_angle(np.ones(3),direction='x',normalize=False,keepdims=True) array([0. , 0.5, 0.5]) >>> project_equal_angle([0,1,1],direction='y',normalize=True,keepdims=False) array([0.4142, 0. ]) """ shift = 'zyx'.index(direction) v = _np.roll(vector/_np.linalg.norm(vector,axis=-1,keepdims=True) if normalize else vector, shift,axis=-1) return _np.roll(_np.block([v[...,:2]/(1.0+_np.abs(v[...,2:3])),_np.zeros_like(v[...,2:3])]), -shift if keepdims else 0,axis=-1)[...,:3 if keepdims else 2]
[docs] def project_equal_area(vector: _np.ndarray, direction: _Literal['x', 'y', 'z'] = 'z', # noqa normalize: bool = True, keepdims: bool = False) -> _np.ndarray: """ Apply equal-area projection to vector. Parameters ---------- vector : numpy.ndarray, shape (...,3) Vector coordinates to be projected. direction : {'x', 'y', 'z'} Projection direction. Defaults to 'z'. normalize : bool Ensure unit length of input vector. Defaults to True. keepdims : bool Maintain three-dimensional output coordinates. Defaults to False. Returns ------- coordinates : numpy.ndarray, shape (...,2 | 3) Projected coordinates. Notes ----- Two-dimensional output uses right-handed frame spanned by the next and next-next axis relative to the projection direction, e.g. x-y when projecting along z and z-x when projecting along y. Examples -------- >>> import damask >>> import numpy as np >>> project_equal_area(np.ones(3)) array([0.4597, 0.4597]) >>> project_equal_area(np.ones(3),direction='x',normalize=False,keepdims=True) array([0. , 0.7071, 0.7071]) >>> project_equal_area([0,1,1],direction='y',normalize=True,keepdims=False) array([0.5412, 0. ]) """ shift = 'zyx'.index(direction) v = _np.roll(vector/_np.linalg.norm(vector,axis=-1,keepdims=True) if normalize else vector, shift,axis=-1) return _np.roll(_np.block([v[...,:2]/_np.sqrt(1.0+_np.abs(v[...,2:3])),_np.zeros_like(v[...,2:3])]), -shift if keepdims else 0,axis=-1)[...,:3 if keepdims else 2]
[docs] def hybrid_IA(dist: _FloatSequence, N: int, rng_seed: _Optional[_NumpyRngSeed] = None) -> _np.ndarray: """ Hybrid integer approximation. Parameters ---------- dist : numpy.ndarray Distribution to be approximated. N : int Number of samples to draw. rng_seed : {None, int, array_like[ints], SeedSequence, BitGenerator, Generator}, optional A seed to initialize the BitGenerator. Defaults to None. If None, then fresh, unpredictable entropy will be pulled from the OS. Returns ------- hist : numpy.ndarray, shape (N) Integer approximation of the distribution. """ N_opt_samples = max(_np.count_nonzero(dist),N) # random subsampling if too little samples requested N_inv_samples = 0 scale_,scale,inc_factor = (0.0,float(N_opt_samples),1.0) while (not _np.isclose(scale, scale_)) and (N_inv_samples != N_opt_samples): repeats = _np.rint(scale*_np.array(dist)).astype(_np.int64) N_inv_samples = _np.sum(repeats) scale_,scale,inc_factor = (scale,scale+inc_factor*0.5*(scale - scale_), inc_factor*2.0) \ if N_inv_samples < N_opt_samples else \ (scale_,0.5*(scale_ + scale), 1.0) return _np.repeat(_np.arange(len(dist)),repeats)[_np.random.default_rng(rng_seed).permutation(N_inv_samples)[:N]]
[docs] def shapeshifter(fro: _Tuple[int, ...], to: _Tuple[int, ...], mode: _Literal['left','right'] = 'left', # noqa keep_ones: bool = False) -> _Tuple[int, ...]: """ Return dimensions that reshape 'fro' to become broadcastable to 'to'. Parameters ---------- fro : tuple Original shape of array. to : tuple Target shape of array after broadcasting. len(to) cannot be less than len(fro). mode : {'left', 'right'}, optional Indicates whether new axes are preferably added to either left or right of the original shape. Defaults to 'left'. keep_ones : bool, optional Treat '1' in fro as literal value instead of dimensional placeholder. Defaults to False. Returns ------- new_dims : tuple Dimensions for reshape. Examples -------- >>> import numpy as np >>> from damask import util >>> a = np.ones((3,4,2)) >>> b = np.ones(4) >>> b_extended = b.reshape(util.shapeshifter(b.shape,a.shape)) >>> (a * np.broadcast_to(b_extended,a.shape)).shape (3, 4, 2) """ if len(fro) == 0 and len(to) == 0: return tuple() _fro = [1] if len(fro) == 0 else list(fro)[::-1 if mode=='left' else 1] _to = [1] if len(to) == 0 else list(to) [::-1 if mode=='left' else 1] final_shape: _List[int] = [] index = 0 for i,item in enumerate(_to): if item == _fro[index]: final_shape.append(item) index+=1 else: final_shape.append(1) if _fro[index] == 1 and not keep_ones: index+=1 if index == len(_fro): final_shape = final_shape+[1]*(len(_to)-i-1) break if index != len(_fro): raise ValueError(f'shapes cannot be shifted {fro} --> {to}') return tuple(final_shape[::-1] if mode == 'left' else final_shape)
[docs] def shapeblender(a: _Tuple[int, ...], b: _Tuple[int, ...], keep_ones: bool = False) -> _Tuple[int, ...]: """ Return a shape that overlaps the rightmost entries of 'a' with the leftmost of 'b'. Parameters ---------- a : tuple Shape of first array. b : tuple Shape of second array. keep_ones : bool, optional Treat innermost '1's as literal value instead of dimensional placeholder. Defaults to False. Examples -------- >>> shapeblender((3,2),(3,2)) (3, 2) >>> shapeblender((4,3),(3,2)) (4, 3, 2) >>> shapeblender((4,4),(3,2)) (4, 4, 3, 2) >>> shapeblender((1,2),(1,2,3)) (1, 2, 3) >>> shapeblender((),(2,2,1)) (2, 2, 1) >>> shapeblender((1,),(2,2,1)) (2, 2, 1) >>> shapeblender((1,),(2,2,1),True) (1, 2, 2, 1) """ def is_broadcastable(a,b): try: _np.broadcast_shapes(a,b) return True except ValueError: return False a_,_b = a,b if keep_ones: i = min(len(a_),len(_b)) while i > 0 and a_[-i:] != _b[:i]: i -= 1 return a_ + _b[i:] else: a_ += max(0,len(_b)-len(a_))*(1,) while not is_broadcastable(a_,_b): a_ = a_ + ((1,) if len(a_)<=len(_b) else ()) _b = ((1,) if len(_b)<len(a_) else ()) + _b return _np.broadcast_shapes(a_,_b)
def _docstringer(docstring: _Union[str, _Callable], adopted_parameters: _Union[None, str, _Callable] = None, adopted_return: _Union[None, str, _Callable] = None, adopted_notes: _Union[None, str, _Callable] = None, adopted_examples: _Union[None, str, _Callable] = None, adopted_references: _Union[None, str, _Callable] = None) -> str: """ Extend a docstring. Parameters ---------- docstring : str or callable, optional Docstring (of callable) to extend. adopted_* : str or callable, optional Additional information to insert into/append to respective section. Notes ----- adopted_return fetches the typehint of a passed function instead of the docstring """ docstring_: str = str( docstring if isinstance(docstring,str) else docstring.__doc__ if callable(docstring) and docstring.__doc__ else '').rstrip()+'\n' sections = _OrderedDict( Parameters=adopted_parameters, Returns=adopted_return, Examples=adopted_examples, Notes=adopted_notes, References=adopted_references) for i, (key, adopted) in [(i,(k,v)) for (i,(k,v)) in enumerate(sections.items()) if v is not None]: section_regex = fr'^([ ]*){key}\s*\n\1*{"-"*len(key)}\s*\n' if key=='Returns': if callable(adopted): return_class = adopted.__annotations__.get('return','') return_type_ = (_sys.modules[adopted.__module__].__name__.split('.')[0] +'.' +(return_class.__name__ if not isinstance(return_class,str) else return_class)) else: return_type_ = adopted docstring_ = _re.sub(fr'(^[ ]*{key}\s*\n\s*{"-"*len(key)}\s*\n[ ]*[A-Za-z0-9_ ]*: )(.*)\n', fr'\1{return_type_}\n', docstring_,flags=_re.MULTILINE) else: section_content_regex = fr'{section_regex}(?P<content>.*?)\n *(\n|\Z)' adopted_: str = adopted.__doc__ if callable(adopted) else adopted #type: ignore try: if _re.search(fr'{section_regex}', adopted_, flags=_re.MULTILINE): adopted_ = _re.search(section_content_regex, #type: ignore adopted_, flags=_re.MULTILINE|_re.DOTALL).group('content') except AttributeError: raise RuntimeError(f"function docstring passed for docstring section '{key}' is invalid:\n{docstring}") docstring_indent, adopted_indent = (min([len(line)-len(line.lstrip()) for line in section.split('\n') if line.strip()]) for section in [docstring_, adopted_]) shift = adopted_indent - docstring_indent adopted_content = '\n'.join([(line[shift:] if shift > 0 else f'{" "*-shift}{line}') for line in adopted_.split('\n') if line.strip()]) if _re.search(section_regex, docstring_, flags=_re.MULTILINE): docstring_section_content = _re.search(section_content_regex, # type: ignore docstring_, flags=_re.MULTILINE|_re.DOTALL).group('content') a_items, d_items = (_re.findall('^[ ]*([A-Za-z0-9_ ]*?)[ ]*:',content,flags=_re.MULTILINE) for content in [adopted_content,docstring_section_content]) for item in a_items: if item in d_items: adopted_content = _re.sub(fr'^([ ]*){item}.*?(?:(\n)\1([A-Za-z0-9_])|([ ]*\Z))', r'\1\3', adopted_content, flags=_re.MULTILINE|_re.DOTALL).rstrip(' \n') docstring_ = _re.sub(fr'(^[ ]*{key}\s*\n\s*{"-"*len(key)}\s*\n.*?)\n *(\Z|\n)', fr'\1\n{adopted_content}\n\2', docstring_, flags=_re.MULTILINE|_re.DOTALL) else: section_title = f'{" "*(shift+docstring_indent)}{key}\n{" "*(shift+docstring_indent)}{"-"*len(key)}\n' section_matches = [_re.search( fr'[ ]*{list(sections.keys())[index]}\s*\n\s*{"-"*len(list(sections.keys())[index])}\s*', docstring_) for index in range(i,len(sections))] subsequent_section = '\\Z' if not any(section_matches) else \ '\n'+next(item for item in section_matches if item is not None).group(0) docstring_ = _re.sub(fr'({subsequent_section})', fr'\n{section_title}{adopted_content}\n\1', docstring_) return docstring_
[docs] def extend_docstring(docstring: _Union[None, str, _Callable] = None, **kwargs) -> _Callable: """ Decorator: Extend the function's docstring. Parameters ---------- docstring : str or callable, optional Docstring to extend. Defaults to that of decorated function. adopted_* : str or callable, optional Additional information to insert into/append to respective section. Notes ----- Return type will become own type if docstring is callable. """ def _decorator(func): if 'adopted_return' not in kwargs: kwargs['adopted_return'] = func func.__doc__ = _docstringer(func.__doc__ if docstring is None else docstring, **kwargs) return func return _decorator
[docs] def pass_on(keyword: str, target: _Callable, wrapped: _Callable = None) -> _Callable: # type: ignore """ Decorator: Combine signatures of 'wrapped' and 'target' functions and pass on output of 'target' as 'keyword' argument. Parameters ---------- keyword : str Keyword added to **kwargs of the decorated function passing on the result of 'target'. target : callable The output of this function is passed to the decorated function as 'keyword' argument. wrapped: callable, optional Signature of 'wrapped' function combined with that of 'target' yields the overall signature of decorated function. Notes ----- The keywords used by 'target' will be prioritized if they overlap with those of the decorated function. Functions 'target' and 'wrapped' are assumed to only have keyword arguments. """ def decorator(func): @_wraps(func) def wrapper(*args, **kwargs): kw_wrapped = set(kwargs.keys()) - set(inspect.getfullargspec(target).args) kwargs_wrapped = {kw: kwargs.pop(kw) for kw in kw_wrapped} kwargs_wrapped[keyword] = target(**kwargs) return func(*args, **kwargs_wrapped) args_ = [] if wrapped is None or 'self' not in inspect.signature(wrapped).parameters \ else [inspect.signature(wrapped).parameters['self']] for f in [target] if wrapped is None else [target,wrapped]: for param in inspect.signature(f).parameters.values(): if param.name != keyword \ and param.name not in [p.name for p in args_]+['self','cls', 'args', 'kwargs']: args_.append(param.replace(kind=inspect._ParameterKind.KEYWORD_ONLY)) wrapper.__signature__ = inspect.Signature(parameters=args_,return_annotation=inspect.signature(func).return_annotation) return wrapper return decorator
[docs] def DREAM3D_base_group(fname: _Union[str, _Path, _h5py.File]) -> str: """ Determine the base group of a DREAM.3D file. The base group is defined as the group (folder) that contains a 'SPACING' dataset in a '_SIMPL_GEOMETRY' group. Parameters ---------- fname : str, pathlib.Path, or _h5py.File Filename of the DREAM.3D (HDF5) file. Returns ------- path : str Path to the base group. """ def get_base_group(f: _h5py.File) -> str: base_group = f.visit(lambda path: path.rsplit('/',2)[0] if '_SIMPL_GEOMETRY/SPACING' in path else None) if base_group is None: raise ValueError(f'could not determine base group in file "{fname}"') return base_group if isinstance(fname,_h5py.File): return get_base_group(fname) with _h5py.File(_Path(fname).expanduser(),'r') as f: return get_base_group(f)
[docs] def DREAM3D_cell_data_group(fname: _Union[str, _Path, _h5py.File]) -> str: """ Determine the cell data group of a DREAM.3D file. The cell data group is defined as the group (folder) that contains a dataset in the base group whose length matches the total number of points as specified in '_SIMPL_GEOMETRY/DIMENSIONS'. Parameters ---------- fname : str, pathlib.Path, or h5py.File Filename of the DREAM.3D (HDF5) file. Returns ------- path : str Path to the cell data group. """ def get_cell_data_group(f: _h5py.File) -> str: base_group = DREAM3D_base_group(f) cells = tuple(f['/'.join([base_group,'_SIMPL_GEOMETRY','DIMENSIONS'])][()][::-1]) cell_data_group = f[base_group].visititems(lambda path,obj: path.split('/')[0] \ if isinstance(obj,_h5py._hl.dataset.Dataset) and _np.shape(obj)[:-1] == cells \ else None) if cell_data_group is None: raise ValueError(f'could not determine cell-data group in file "{fname}/{base_group}"') return cell_data_group if isinstance(fname,_h5py.File): return get_cell_data_group(fname) with _h5py.File(_Path(fname).expanduser(),'r') as f: return get_cell_data_group(f)
[docs] def Bravais_to_Miller(*, uvtw: _Optional[_IntSequence] = None, hkil: _Optional[_IntSequence] = None) -> _np.ndarray: """ Transform 4 Miller–Bravais indices to 3 Miller indices of crystal direction [uvw] or plane normal (hkl). Parameters ---------- uvtw|hkil : numpy.ndarray, shape (...,4) Miller–Bravais indices of crystallographic direction [uvtw] or plane normal (hkil). Returns ------- uvw|hkl : numpy.ndarray, shape (...,3) Miller indices of [uvw] direction or (hkl) plane normal. """ if (uvtw is not None) ^ (hkil is None): raise KeyError('specify either "uvtw" or "hkil"') if uvtw is not None and (_np.sum(_np.asarray(uvtw)[...,:3],axis=-1) != 0).any(): raise ValueError(rf'u+v+t≠0: {uvtw}') if hkil is not None and (_np.sum(_np.asarray(hkil)[...,:3],axis=-1) != 0).any(): raise ValueError(rf'h+k+i≠0: {hkil}') axis,basis = (_np.array(uvtw),_np.array([[2,1,0,0], [1,2,0,0], [0,0,0,1]])) \ if hkil is None else \ (_np.array(hkil),_np.array([[1,0,0,0], [0,1,0,0], [0,0,0,1]])) uvw_hkl = _np.einsum('il,...l',basis,axis) if not _np.issubdtype(uvw_hkl.dtype,_np.signedinteger): raise TypeError('"uvtw"/"hkil" are not (signed) integers') return uvw_hkl//_np.gcd.reduce(uvw_hkl,axis=-1,keepdims=True)
[docs] def Miller_to_Bravais(*, uvw: _Optional[_IntSequence] = None, hkl: _Optional[_IntSequence] = None) -> _np.ndarray: """ Transform 3 Miller indices to 4 Miller–Bravais indices of crystal direction [uvtw] or plane normal (hkil). Parameters ---------- uvw|hkl : numpy.ndarray, shape (...,3) Miller indices of crystallographic direction [uvw] or plane normal (hkl). Returns ------- uvtw|hkil : numpy.ndarray, shape (...,4) Miller–Bravais indices of [uvtw] direction or (hkil) plane normal. """ if (uvw is not None) ^ (hkl is None): raise KeyError('specify either "uvw" or "hkl"') axis,basis = (_np.asarray(uvw),_np.array([[ 2,-1, 0], [-1, 2, 0], [-1,-1, 0], [ 0, 0, 3]])) \ if hkl is None else \ (_np.asarray(hkl),_np.array([[ 1, 0, 0], [ 0, 1, 0], [-1,-1, 0], [ 0, 0, 1]])) uvtw_hkil = _np.einsum('il,...l',basis,axis) if not _np.issubdtype(uvtw_hkil.dtype,_np.signedinteger): raise TypeError('"uvw"/"hkl" are not (signed) integers') return uvtw_hkil//_np.gcd.reduce(uvtw_hkil,axis=-1,keepdims=True)
[docs] def dict_prune(d: _Dict) -> _Dict: """ Recursively remove empty dictionaries. Parameters ---------- d : dict Dictionary to prune. Returns ------- pruned : dict Pruned dictionary. """ # https://stackoverflow.com/questions/48151953 new = {} for k,v in d.items(): if isinstance(v, dict): v = dict_prune(v) if not isinstance(v,dict) or v != {}: new[k] = v return new
[docs] def dict_flatten(d: _Dict) -> _Dict: """ Recursively remove keys of single-entry dictionaries. Parameters ---------- d : dict Dictionary to flatten. Returns ------- flattened : dict Flattened dictionary. """ if isinstance(d,dict) and len(d) == 1: entry = d[list(d.keys())[0]] new = dict_flatten(entry.copy()) if isinstance(entry,dict) else entry else: new = {k: (dict_flatten(v) if isinstance(v, dict) else v) for k,v in d.items()} return new
#################################################################################################### # Classes ####################################################################################################
[docs] class ProgressBar: """ Report progress of an interation as a status bar. Works for 0-based loops, ETA is estimated by linear extrapolation. """ def __init__(self, total: int, prefix: str, bar_length: int): """ New progress bar. Parameters ---------- total : int Total # of iterations. prefix : str Prefix string. bar_length : int Character length of bar. """ self.total = total self.prefix = prefix self.bar_length = bar_length self.time_start = self.time_last_update = _datetime.datetime.now() self.fraction_last = 0.0 if _sys.stdout.isatty(): _sys.stdout.write(f"{self.prefix} {'░'*self.bar_length} 0% ETA n/a") def update(self, iteration: int) -> None: fraction = (iteration+1) / self.total if (filled_length := int(self.bar_length * fraction)) > int(self.bar_length * self.fraction_last) or \ _datetime.datetime.now() - self.time_last_update > _datetime.timedelta(seconds=10): self.time_last_update = _datetime.datetime.now() bar = '█' * filled_length + '░' * (self.bar_length - filled_length) remaining_time = (_datetime.datetime.now() - self.time_start) \ * (self.total - (iteration+1)) / (iteration+1) remaining_time -= _datetime.timedelta(microseconds=remaining_time.microseconds) # remove μs if _sys.stdout.isatty(): _sys.stdout.write(f'\r{self.prefix} {bar} {fraction:>4.0%} ETA {remaining_time}') self.fraction_last = fraction if iteration == self.total - 1 and _sys.stdout.isatty(): _sys.stdout.write('\n')